| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static java.lang.System.out; |
| import static org.apache.geode.internal.cache.DistributedCacheOperation.SLOW_DISTRIBUTION_MS; |
| import static org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType.DuringPackingImage; |
| import static org.apache.geode.internal.cache.InitialImageOperation.getGIITestHookForCheckingPurpose; |
| import static org.apache.geode.internal.cache.InitialImageOperation.resetGIITestHook; |
| import static org.apache.geode.internal.cache.InitialImageOperation.setGIITestHook; |
| import static org.apache.geode.test.dunit.Host.getHost; |
| import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter; |
| import static org.apache.geode.test.dunit.Wait.pause; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.util.concurrent.CountDownLatch; |
| |
| import org.junit.Test; |
| |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.GemFireIOException; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.DistributionMessageObserver; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.HeapDataOutputStream; |
| import org.apache.geode.internal.cache.DestroyOperation.DestroyMessage; |
| import org.apache.geode.internal.cache.DistributedTombstoneOperation.TombstoneMessage; |
| import org.apache.geode.internal.cache.InitialImageOperation.GIITestHook; |
| import org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType; |
| import org.apache.geode.internal.cache.InitialImageOperation.RequestImageMessage; |
| import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage; |
| import org.apache.geode.internal.cache.entries.DiskEntry; |
| import org.apache.geode.internal.cache.persistence.DiskStoreID; |
| import org.apache.geode.internal.cache.versions.RegionVersionVector; |
| import org.apache.geode.internal.cache.versions.VersionTag; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.Assert; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.IgnoredException; |
| import org.apache.geode.test.dunit.Invoke; |
| import org.apache.geode.test.dunit.LogWriterUtils; |
| import org.apache.geode.test.dunit.SerializableCallable; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| |
| |
| public class GIIDeltaDUnitTest extends JUnit4CacheTestCase { |
| |
| VM P; // GII provider |
| VM R; // GII requester |
| InternalDistributedMember R_ID; // DistributedMember ID of R |
| |
| private static final long MAX_WAIT = 30000; |
| // protected static String REGION_NAME = GIIDeltaDUnitTest.class.getSimpleName()+"_Region"; |
| protected static String REGION_NAME = "_Region"; |
| final String expectedExceptions = GemFireIOException.class.getName(); |
| protected IgnoredException expectedEx; |
| static Object giiSyncObject = new Object(); |
| |
| public GIIDeltaDUnitTest() { |
| super(); |
| } |
| |
| @Override |
| public final void postSetUp() throws Exception { |
| Invoke.invokeInEveryVM(GIIDeltaDUnitTest.class, "setRegionName", |
| new Object[] {getUniqueName()}); |
| setRegionName(getUniqueName()); |
| } |
| |
| public static void setRegionName(String testName) { |
| REGION_NAME = testName + "_Region"; |
| } |
| |
| @Override |
| public final void preTearDownCacheTestCase() throws Exception { |
| P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII()); |
| R.invoke(() -> GIIDeltaDUnitTest.resetSlowGII()); |
| P.invoke(() -> InitialImageOperation.resetAllGIITestHooks()); |
| R.invoke(() -> InitialImageOperation.resetAllGIITestHooks()); |
| changeUnfinishedOperationLimit(R, 10000); |
| changeForceFullGII(R, false, false); |
| changeForceFullGII(P, false, false); |
| P = null; |
| R = null; |
| } |
| |
| @Override |
| public final void postTearDownCacheTestCase() throws Exception { |
| // clean up the test hook, which can be moved to CacheTestCase |
| DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; |
| if (expectedEx != null) { |
| expectedEx.remove(); |
| } |
| } |
| |
| // private VersionTag getVersionTag(VM vm, final String key) { |
| // SerializableCallable getVersionTag = new SerializableCallable("verify recovered entry") { |
| // public Object call() { |
| // VersionTag tag = CCRegion.getVersionTag(key); |
| // return tag; |
| // |
| // } |
| // }; |
| // return (VersionTag)vm.invoke(getVersionTag); |
| // } |
| |
| public InternalDistributedMember getDistributedMemberID(VM vm) { |
| SerializableCallable getID = new SerializableCallable("get member id") { |
| @Override |
| public Object call() { |
| return getCache().getDistributedSystem().getDistributedMember(); |
| } |
| }; |
| InternalDistributedMember id = (InternalDistributedMember) vm.invoke(getID); |
| return id; |
| } |
| |
| protected void prepareForEachTest() { |
| Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| createDistributedRegion(vm0); |
| createDistributedRegion(vm1); |
| assignVMsToPandR(vm0, vm1); |
| // from now on, use P and R as |
| // vmhttps://wiki.gemstone.com/display/gfepersistence/DeltaGII+Spec+for+8.0 |
| expectedEx = IgnoredException.addIgnoredException(expectedExceptions); |
| } |
| |
| // these steps are shared by all test cases |
| private void prepareCommonTestData(int p_version) { |
| // operation P1: region.put("key") at P. After the operation, region version for P becomes 1 |
| R_ID = getDistributedMemberID(R); |
| |
| assertDeltaGIICountBeZero(P); |
| assertDeltaGIICountBeZero(R); |
| doOnePut(P, 1, "key1"); |
| doOnePut(R, 1, "key2"); |
| doOnePut(R, 2, "key5"); |
| |
| createConflictOperationsP2R3(); |
| |
| for (int i = 3; i <= p_version; i++) { |
| switch (i) { |
| case 3: |
| doOneDestroy(P, 3, "key1"); |
| break; |
| case 4: |
| doOneDestroy(P, 4, "key2"); |
| break; |
| case 5: |
| doOnePut(P, 5, "key1"); |
| break; |
| case 6: |
| doOnePut(P, 6, "key3"); |
| default: |
| // donothing |
| } |
| } |
| // at this moment, cache has key1, key3, key5 |
| } |
| |
| private void createConflictOperationsP2R3() { |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long blocklist[] = {2, 3}; |
| |
| P.invoke(() -> GIIDeltaDUnitTest.slowGII(blocklist)); |
| R.invoke(() -> GIIDeltaDUnitTest.slowGII(blocklist)); |
| AsyncInvocation async1 = doOnePutAsync(P, 2, "key1"); |
| AsyncInvocation async2 = doOnePutAsync(R, 3, "key1"); |
| |
| // wait for the local puts are done at P & R before distribution |
| waitForToVerifyRVV(P, memberP, 2, null, 0); // P's rvv=p2, gc=0 |
| waitForToVerifyRVV(P, memberR, 2, null, 0); // P's rvv=r2, gc=0 |
| waitForToVerifyRVV(R, memberP, 1, null, 0); // P's rvv=p2, gc=0 |
| waitForToVerifyRVV(R, memberR, 3, null, 0); // P's rvv=r2, gc=0 |
| |
| // new Object[] { memberP, 2, 3, 0, 0, false } |
| P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII()); |
| R.invoke(() -> GIIDeltaDUnitTest.resetSlowGII()); |
| |
| // should wait for async calls to finish before going on |
| checkAsyncCall(async1); |
| checkAsyncCall(async2); |
| |
| // verify RVVs |
| waitForToVerifyRVV(P, memberP, 2, null, 0); // P's rvv=p2, gc=0 |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| waitForToVerifyRVV(R, memberP, 2, null, 0); // P's rvv=p2, gc=0 |
| waitForToVerifyRVV(R, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| |
| // verify P won the conflict check |
| waitToVerifyKey(P, "key1", generateValue(P)); |
| waitToVerifyKey(R, "key1", generateValue(P)); |
| } |
| |
| private void createUnfinishedOperationsR4R5() { |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| // let r6 to succeed, r4,r5 to be blocked |
| R.invoke(() -> GIIDeltaDUnitTest.slowGII(exceptionlist)); |
| AsyncInvocation async1 = doOnePutAsync(R, 4, "key4"); |
| waitForToVerifyRVV(R, memberR, 4, null, 0); // P's rvv=r4, gc=0 |
| |
| AsyncInvocation async2 = doOneDestroyAsync(R, 5, "key5"); |
| waitForToVerifyRVV(R, memberR, 5, null, 0); // P's rvv=r5, gc=0 |
| |
| doOnePut(R, 6, "key1"); // r6 will pass |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| |
| // P should have exception list R4,R5 now |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0 |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. Each does a few operations to make RVV=P7,R6, |
| * RVVGC=P4,R0 for both members. vm1 becomes offline then restarts. The deltaGII should only |
| * exchange RVV. No need to send data from vm0 to vm1. |
| */ |
| @Test |
| public void testDeltaGIIWithSameRVV() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| |
| // force tombstone GC to let RVVGC to become P4:R0 |
| forceGC(P, 2); |
| waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4 |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| |
| // let r4,r5,r6 to succeed |
| doOnePut(R, 4, "key4"); |
| doOneDestroy(R, 5, "key5"); |
| doOnePut(R, 6, "key1"); |
| |
| // let p7 to succeed |
| doOnePut(P, 7, "key1"); |
| |
| waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| |
| waitForToVerifyRVV(R, memberP, 7, null, 4); // P's rvv=p7, gc=4 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| // now the rvv and rvvgc at P and R should be the same |
| |
| // save R's rvv in byte array, check if it will be fullGII |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| // shutdown R and restart |
| closeCache(R); |
| |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| createDistributedRegion(R); |
| waitForToVerifyRVV(R, memberP, 7, null, 4); // P's rvv=p7, gc=4 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same |
| |
| // If fullGII, the key size in gii chunk is 4, i.e. key1,key3,key4,key5(tombstone). key2 is |
| // GCed. |
| // If delta GII, the key size should be 0 |
| verifyDeltaSizeFromStats(R, 0, 1); |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. create some exception list. Before GII, P's RVV is |
| * P7,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0 vm1 becomes offline then restarts. |
| * https://wiki.gemstone.com/display/gfepersistence/DeltaGII+Spec+for+8.0 The deltaGII should send |
| * delta to R, revoke unfinished opeation R4,R5 |
| */ |
| @Test |
| public void testDeltaGIIWithExceptionList() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| VersionTag expect_tag = getVersionTag(R, "key5"); |
| |
| // force tombstone GC to let RVVGC to become P4:R0 |
| forceGC(P, 2); |
| waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4 |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| |
| createUnfinishedOperationsR4R5(); |
| |
| // now P's cache still only has key1, key3, key5 |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| // p7 only apply at P |
| doOnePut(P, 7, "key1"); |
| |
| // restart and gii |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| createDistributedRegion(R); |
| waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0 |
| waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4 |
| waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); |
| |
| // If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed. |
| // If delta GII, the key size should be 2, i.e. P7(key1) and (key5(T) which is unfinished |
| // operation) |
| verifyDeltaSizeFromStats(R, 2, 1); |
| // verify unfinished op for key5 is revoked |
| waitToVerifyKey(R, "key5", generateValue(R)); |
| VersionTag tag = getVersionTag(R, "key5"); |
| assertTrue(expect_tag.equals(tag)); |
| |
| // restart P, since R has received exceptionlist R4,R5 from P |
| closeCache(P); |
| createDistributedRegion(P); |
| waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0 |
| // If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key4 is removed as |
| // unfinished op |
| // If deltaGII, the key size should be 0 |
| verifyDeltaSizeFromStats(P, 0, 1); |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. create some exception list. Before GII, P's RVV is |
| * P6,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0 vm1 becomes offline then restarts. The |
| * deltaGII should send delta which only contains unfinished operation R4,R5 |
| */ |
| @Test |
| public void testDeltaGIIWithOnlyUnfinishedOp() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| VersionTag expect_tag = getVersionTag(R, "key5"); |
| |
| // force tombstone GC to let RVVGC to become P4:R0 |
| forceGC(P, 2); |
| waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4 |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| |
| createUnfinishedOperationsR4R5(); |
| |
| // now P's cache still only has key1, key3, key5 |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| |
| // restart and gii |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| createDistributedRegion(R); |
| waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0 |
| waitForToVerifyRVV(R, memberP, 6, null, 4); // R's rvv=p6, gc=4 |
| waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); |
| |
| // If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed. |
| // If delta GII, the key size should be 1 (key5(T) which is unfinished operation) |
| verifyDeltaSizeFromStats(R, 1, 1); |
| // verify unfinished op for key5 is revoked |
| waitToVerifyKey(R, "key5", generateValue(R)); |
| VersionTag tag = getVersionTag(R, "key5"); |
| assertTrue(expect_tag.equals(tag)); |
| |
| // restart P, since R has received exceptionlist R4,R5 from P |
| closeCache(P); |
| createDistributedRegion(P); |
| waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0 |
| // If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key4 is removed as |
| // unfinished op |
| // If deltaGII, the key size should be 0 |
| verifyDeltaSizeFromStats(P, 0, 1); |
| |
| // restart R, to make sure the unfinished op is handled correctly |
| forceGC(R, 1); // for bug 47616 |
| waitForToVerifyRVV(P, memberR, 6, null, 5); // P's rvv=R6, gc=5 |
| waitForToVerifyRVV(R, memberR, 6, null, 5); // P's rvv=R6, gc=5 |
| |
| closeCache(R); |
| createDistributedRegion(R); |
| // If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key4 is removed as |
| // unfinished op |
| // If deltaGII, the key size should be 0 |
| verifyDeltaSizeFromStats(R, 0, 1); |
| // verify unfinished op for key5 is revoked |
| waitToVerifyKey(R, "key5", generateValue(R)); |
| tag = getVersionTag(R, "key5"); |
| assertTrue(expect_tag.equals(tag)); |
| } |
| |
| /** |
| * This is to test a race condition for bug#47616 vm0 and vm1 are peers, each holds a DR. create |
| * some exception list. Before GII, P's RVV is P6,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0 |
| * vm1 becomes offline then restarts. The deltaGII should send delta which only contains |
| * unfinished opeation R4,R5 |
| */ |
| @Test |
| public void testDeltaGIIWithOnlyUnfinishedOp_GCAtR() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| VersionTag expect_tag = getVersionTag(R, "key5"); |
| |
| // force tombstone GC to let RVVGC to become P4:R0 |
| forceGC(P, 2); |
| waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4 |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| |
| createUnfinishedOperationsR4R5(); |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0 |
| |
| // 2 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterRequestRVV = new Mycallback(GIITestHookType.AfterRequestRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterRequestRVV); |
| } |
| }); |
| |
| // now P's cache still only has key1, key3, key5 |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| |
| // restart and gii |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| |
| // 2 |
| waitForCallbackStarted(R, GIITestHookType.AfterRequestRVV); |
| forceGC(R, 1); |
| R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterRequestRVV, true)); |
| |
| async3.join(MAX_WAIT); |
| |
| // verify unfinished op for key5 is revoked |
| waitToVerifyKey(R, "key5", generateValue(R)); |
| VersionTag tag = getVersionTag(R, "key5"); |
| assertTrue(expect_tag.equals(tag)); |
| verifyTombstoneExist(R, "key5", false, false); |
| |
| // If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed. |
| // If delta GII, the key size should be 1 (key5(T) which is unfinished operation) |
| verifyDeltaSizeFromStats(R, 3, 0); |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. create some exception list. Then shutdown R. Do |
| * tombstone GC at P only. Before GII, P's RVV is P7,R6(3-6), RVVGC is P4,R0; R's RVV is P6,R6, |
| * RVVGC are both P0,R0 vm1 becomes offline then restarts. |
| * https://wiki.gemstone.com/display/gfepersistence/DeltaGII+Spec+for+8.0 The deltaGII should send |
| * delta to R, revoke unfinished opeation R4,R5 |
| */ |
| @Test |
| public void testDeltaGIIWithDifferentRVVGC() throws Throwable { |
| final String testcase = "testDeltaGIIWithExceptionList"; |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0 |
| VersionTag expect_tag = getVersionTag(R, "key5"); |
| |
| createUnfinishedOperationsR4R5(); |
| |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| |
| // force tombstone GC to let RVVGC to become P4:R0 |
| changeTombstoneTimout(R, MAX_WAIT); |
| changeTombstoneTimout(P, MAX_WAIT); |
| Wait.pause((int) MAX_WAIT); |
| forceGC(P, 2); |
| waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4 |
| |
| // p7 only apply at P |
| doOnePut(P, 7, "key1"); |
| waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4 |
| verifyTombstoneExist(P, "key2", false, false); |
| |
| // restart R and gii |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| createDistributedRegion(R); |
| waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4 |
| waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); |
| |
| // If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed. |
| // If delta GII, the key size should be 2, i.e. P7 (key1) and (key5(T) which is unfinished |
| // operation) |
| verifyDeltaSizeFromStats(R, 2, 1); |
| // verify unfinished op for key5 is revoked |
| waitToVerifyKey(R, "key5", generateValue(R)); |
| VersionTag tag = getVersionTag(R, "key5"); |
| assertTrue(expect_tag.equals(tag)); |
| verifyTombstoneExist(R, "key2", false, false); |
| verifyTombstoneExist(P, "key2", false, false); |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. Let provider to have higher RVVGC than requester's RVV |
| * It should trigger fullGII |
| * |
| * This test also verify tombstoneGC happened in middle of GII, but BEFORE GII thread got GIILock. |
| * i.e. before GII, P's RVVGC=P0,R0, upon received RequestImageMessage, it becomes P4,R0 it should |
| * cause the fullGII. |
| */ |
| @Test |
| public void testFullGIITriggeredByHigherRVVGC() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(3); |
| waitForToVerifyRVV(P, memberP, 3, null, 0); // P's rvv=p3, gc=0 |
| |
| createUnfinishedOperationsR4R5(); |
| waitForToVerifyRVV(R, memberP, 3, null, 0); // R's rvv=p3, gc=0 |
| |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| |
| // p4-7 only apply at P |
| doOneDestroy(P, 4, "key2"); |
| doOnePut(P, 5, "key1"); |
| doOnePut(P, 6, "key3"); |
| doOnePut(P, 7, "key1"); |
| |
| |
| // add test hook |
| // 7 |
| P.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterReceivedRequestImage = |
| new Mycallback(GIITestHookType.AfterReceivedRequestImage, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterReceivedRequestImage); |
| } |
| }); |
| |
| // restart R and gii, it will be blocked at test hook |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| // 7 |
| waitForCallbackStarted(P, GIITestHookType.AfterReceivedRequestImage); |
| |
| // force tombstone GC to let RVVGC to become P4:R0, but R already sent its old RVV/RVVGC over |
| // this tombstone GC happens BEFORE GII thread got the GIILock, so it will be fullGCC |
| forceGC(P, 2); |
| waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0 |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true); |
| |
| // let GII continue |
| P.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterReceivedRequestImage, |
| true)); |
| async3.join(MAX_WAIT); |
| |
| waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4 |
| waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); |
| |
| // In fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed. |
| verifyDeltaSizeFromStats(R, 3, 0); |
| } |
| |
| /** |
| * vm0(P), vm1(R), vm2(T) are peers, each holds a DR. shutdown vm1(R), vm2(T) Let provider to have |
| * higher RVVGC than requester's RVV when vm1 and vm2 are offline Restart R, It should trigger |
| * fullGII If R did not save the rvvgc, restarted R will have a way smaller rvvgc (maybe the same |
| * as T's) Let T requests GII from R, it wll become deltaGII, which is wrong. |
| */ |
| @Test |
| public void testSavingRVVGC() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| Host host = Host.getHost(0); |
| VM T = host.getVM(2); |
| createDistributedRegion(T); |
| final DiskStoreID memberT = getMemberID(T); |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(3); |
| waitForToVerifyRVV(P, memberP, 3, null, 0); // P's rvv=p3, gc=0 |
| |
| waitForToVerifyRVV(R, memberP, 3, null, 0); // R's rvv=p3, gc=0 |
| waitForToVerifyRVV(T, memberP, 3, null, 0); // T's rvv=p3, gc=0 |
| |
| closeCache(T); |
| closeCache(R); |
| |
| // p4-7 only apply at P |
| doOneDestroy(P, 4, "key2"); |
| doOnePut(P, 5, "key1"); |
| doOnePut(P, 6, "key3"); |
| doOnePut(P, 7, "key1"); |
| // force tombstone GC to let RVVGC to become P4:R0 |
| forceGC(P, 2); |
| |
| // restart R and gii, it will be blocked at test hook |
| createDistributedRegion(R); |
| |
| waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4 |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4 |
| waitForToVerifyRVV(R, memberR, 3, null, 0); // R's rvv=r3, gc=0 |
| |
| // RegionVersionVector p_rvv = getRVV(P); |
| // RegionVersionVector r_rvv = getRVV(R); |
| // assertSameRVV(p_rvv, r_rvv); |
| |
| // In fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed. |
| verifyDeltaSizeFromStats(R, 3, 0); |
| verifyTombstoneExist(P, "key2", false, false); |
| verifyTombstoneExist(R, "key2", false, false); |
| |
| closeCache(P); |
| closeCache(R); |
| createDistributedRegion(R); |
| waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4 |
| waitForToVerifyRVV(R, memberR, 3, null, 0); // R's rvv=r3, gc=0 |
| verifyTombstoneExist(R, "key2", false, false); |
| |
| createDistributedRegion(T); |
| waitForToVerifyRVV(T, memberP, 7, null, 4); // R's rvv=p7, gc=4 |
| waitForToVerifyRVV(T, memberR, 3, null, 0); // R's rvv=r3, gc=0 |
| // In fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed. |
| verifyDeltaSizeFromStats(T, 3, 0); |
| verifyTombstoneExist(T, "key2", false, false); |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. create some exception list. Not to do any tombstone GC |
| * Before GII, P's RVV is P7,R6(3-6), R's RVV is P6,R6, RVVGC are both P0,R0 vm1 becomes offline |
| * then restarts. https://wiki.gemstone.com/display/gfepersistence/DeltaGII+Spec+for+8.0 The |
| * deltaGII should send delta to R, revoke unfinished opeation R4,R5 |
| */ |
| @Test |
| public void testDeltaGIIWithoutRVVGC() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0 |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| VersionTag expect_tag = getVersionTag(R, "key5"); |
| |
| createUnfinishedOperationsR4R5(); |
| |
| // now P's cache still only has key1, key3 |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| // p7 only apply at P |
| doOnePut(P, 7, "key1"); |
| |
| // restart R and gii |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| createDistributedRegion(R); |
| |
| waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0 |
| waitForToVerifyRVV(R, memberP, 7, null, 0); // R's rvv=p7, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); |
| |
| // If fullGII, the key size in gii chunk is 4. |
| // tombstone counts in deltaSize, so without rvvgc, it's 4: key1, key2(tombstone), key3, key5 |
| // In delta GII, it should be 1, i.e. P7 (key1) and (key5(T) which is unfinished operation) |
| verifyDeltaSizeFromStats(R, 2, 1); |
| // verify unfinished op for key5 is revoked |
| waitToVerifyKey(R, "key5", generateValue(R)); |
| VersionTag tag = getVersionTag(R, "key5"); |
| assertTrue(expect_tag.equals(tag)); |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. unifinished P8: destroy(key1), finished P9: put(key3). |
| * Shutdown R, then GC tombstones at P. P's RVV=P9,R6(3-6), RVVGC=P8,R0, R's RVV=P9(7-9),R6, RVV |
| * It should trigger fullGII |
| */ |
| @Test |
| public void testFullGIINotDorminatedByProviderRVVGC() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(3); |
| |
| createUnfinishedOperationsR4R5(); |
| |
| waitForToVerifyRVV(P, memberP, 3, null, 0); // P's rvv=p3, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0 |
| waitForToVerifyRVV(R, memberP, 3, null, 0); // R's rvv=p3, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0 |
| |
| // p4-7 only apply at P |
| doOneDestroy(P, 4, "key2"); |
| doOnePut(P, 5, "key1"); |
| doOnePut(P, 6, "key3"); |
| doOnePut(P, 7, "key1"); |
| |
| final long[] exceptionlist2 = {8}; |
| // let p9 to succeed, p8 to be blocked |
| P.invoke(() -> GIIDeltaDUnitTest.slowGII(exceptionlist2)); |
| AsyncInvocation async1 = doOneDestroyAsync(P, 8, "key1"); |
| waitForToVerifyRVV(P, memberP, 8, null, 0); |
| doOnePut(P, 9, "key3"); |
| waitForToVerifyRVV(P, memberP, 9, null, 0); |
| waitForToVerifyRVV(R, memberP, 9, exceptionlist2, 0); |
| |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| forceGC(P, 3); |
| // now P's RVV=P9,R6(3-6), RVVGC=P8,R0, R's RVV=P9(7-9), R6 |
| waitForToVerifyRVV(P, memberP, 9, null, 8); // P's rvv=p9, gc=8 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0 |
| P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII()); |
| |
| // restart and gii, R's rvv should be the same as P's |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true); |
| createDistributedRegion(R); |
| waitForToVerifyRVV(R, memberP, 9, null, 8); // R's rvv=p9, gc=8 |
| waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same |
| |
| // In fullGII, the key size in gii chunk is 2. They are: key3, key5 |
| verifyDeltaSizeFromStats(R, 2, 0); |
| } |
| |
| /** |
| * Let R4, R5 unfinish, but R5 is the last operation from R. So P's RVV is still P:x,R3, without |
| * exception list. But actually R4, R5 are unfinished ops by all means. |
| */ |
| @Test |
| public void testUnfinishedOpsWithoutExceptionList() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| VersionTag expect_tag = getVersionTag(R, "key5"); |
| |
| final long[] exceptionlist = {4, 5}; |
| R.invoke(() -> GIIDeltaDUnitTest.slowGII(exceptionlist)); |
| AsyncInvocation async1 = doOnePutAsync(R, 4, "key4"); |
| waitForToVerifyRVV(R, memberR, 4, null, 0); // P's rvv=r4, gc=0 |
| |
| AsyncInvocation async2 = doOneDestroyAsync(R, 5, "key5"); |
| waitForToVerifyRVV(R, memberR, 5, null, 0); // P's rvv=r5, gc=0 |
| |
| // P should have unfinished ops R4,R5, but they did not show up in exception list |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| |
| // let p7 to succeed |
| doOnePut(P, 7, "key1"); |
| |
| waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0 |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| waitForToVerifyRVV(R, memberP, 7, null, 0); // R's rvv=p7, gc=0 |
| waitForToVerifyRVV(R, memberR, 5, null, 0); // R's rvv=r3, gc=0 |
| |
| // now P's rvv=P7,R3, R's RVV=P7,R5 |
| // shutdown R and restart |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| createDistributedRegion(R); |
| |
| waitForToVerifyRVV(R, memberP, 7, null, 0); // P's rvv=p7, gc=0 |
| waitForToVerifyRVV(R, memberR, 3, exceptionlist, 0); // P's rvv=r3, gc=0 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same |
| |
| // full GII chunk has 4 keys: key1,2(tombstone),3,5 |
| // delta GII chunk has 1 key, i.e. (key5(T) which is unfinished operation) |
| verifyDeltaSizeFromStats(R, 1, 1); |
| // verify unfinished op for key5 is revoked |
| waitToVerifyKey(R, "key5", generateValue(R)); |
| VersionTag tag = getVersionTag(R, "key5"); |
| assertTrue(expect_tag.equals(tag)); |
| |
| // shutdown R again and restart, to verify localVersion=5 will be saved and recovered |
| closeCache(R); |
| createDistributedRegion(R); |
| |
| // P will receive R6 and have exception R6(3-6) |
| doOnePut(R, 6, "key1"); // r6 will pass |
| waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0 |
| } |
| |
| /** |
| * P, R has key1 unchanged, make certain unfinished operations on R to trigger full gii. |
| */ |
| @Test |
| public void testRecoveredFromDiskBitAfterFullGII() { |
| prepareForEachTest(); |
| getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| |
| verifyRecoveredFromDiskBitAfterGII(memberR, true); |
| } |
| |
| /** |
| * P, R has key1 unchanged, make one unfinished operation on R to trigger delta gii. |
| */ |
| @Test |
| public void testRecoveredFromDiskBitAfterDeltaGII() { |
| prepareForEachTest(); |
| getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| |
| verifyRecoveredFromDiskBitAfterGII(memberR, false); |
| } |
| |
| private void verifyRecoveredFromDiskBitAfterGII(final DiskStoreID memberR, boolean isFullGII) { |
| final long[] exceptionlist = {3, 4}; |
| doOnePut(R, 1, "key1"); |
| doOnePut(R, 2, "key2"); |
| |
| R.invoke(() -> GIIDeltaDUnitTest.slowGII(exceptionlist)); |
| doOnePutAsync(R, 3, "key2"); |
| waitForToVerifyRVV(R, memberR, 3, null, 0); // R's rvv=r3, gc=0 |
| |
| if (isFullGII) { |
| doOnePutAsync(R, 4, "key2"); |
| waitForToVerifyRVV(R, memberR, 4, null, 0); // R's rvv=r4, gc=0 |
| } |
| |
| closeCache(R); |
| |
| doOnePut(P, 1, "key2"); |
| |
| // restart R |
| createDistributedRegion(R); |
| |
| R.invoke(() -> verifyRecoveredFromDiskBit("key2", isFullGII, true)); |
| R.invoke(() -> verifyRecoveredFromDiskBit("key1", isFullGII, false)); |
| } |
| |
| private void verifyRecoveredFromDiskBit(String key, boolean isFullGII, |
| boolean versionTagBeenChanged) { |
| LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME); |
| DiskEntry re = (DiskEntry) lr.getRegionEntry(key); |
| DiskId id = re.getDiskId(); |
| |
| byte usebits = id.getUserBits(); |
| if (!versionTagBeenChanged && !isFullGII) { |
| // in delta gii, versionTag not changed entry should kept the recoveredFromDisk bit |
| assertTrue(EntryBits.isRecoveredFromDisk(usebits)); |
| } else { |
| assertFalse(EntryBits.isRecoveredFromDisk(usebits)); |
| } |
| } |
| |
| /** |
| * P1, P2, P3 R does GII but wait at BeforeSavedReceivedRVV, so R's RVV=P3R0 P4, P5 R goes on to |
| * save received RVV. R's new RVV=P5(3-6)R0 |
| * |
| * When image (which contains P4, P5) arrives, it should fill the special exception |
| */ |
| @Test |
| public void testFillSpecialException() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| doOnePut(P, 1, "key1"); |
| doOnePut(P, 2, "key2"); |
| doOneDestroy(P, 3, "key1"); |
| changeForceFullGII(R, false, true); |
| |
| // 4 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myBeforeSavedReceivedRVV = |
| new Mycallback(GIITestHookType.BeforeSavedReceivedRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myBeforeSavedReceivedRVV); |
| } |
| }); |
| // 5 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterSavedReceivedRVV = |
| new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV); |
| } |
| }); |
| |
| closeCache(R); |
| changeForceFullGII(R, true, false); |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| waitForCallbackStarted(R, GIITestHookType.BeforeSavedReceivedRVV); |
| |
| doOneDestroy(P, 4, "key2"); |
| doOnePut(P, 5, "key1"); |
| R.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.BeforeSavedReceivedRVV, true)); |
| waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV); |
| R.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true)); |
| async3.join(MAX_WAIT); |
| waitForToVerifyRVV(R, memberP, 5, null, 0); // P's rvv=r5, gc=0 |
| changeForceFullGII(R, true, true); |
| changeForceFullGII(P, false, true); |
| verifyDeltaSizeFromStats(R, 2, 0); |
| } |
| |
| /** |
| * P1, P2, P3 R does GII but wait at BeforeSavedReceivedRVV, so R's RVV=P3R0 P4, P5 R goes on to |
| * save received RVV. R's new RVV=P5(3-6)R0 |
| * |
| * When image (which contains P4, P5) arrives, it should fill the special exception |
| * |
| * This test will let the 2 operations happen before RIM, so the rvv will match between R and P |
| * and no gii will happen. In this way, the chunk will not contain the missing entries. |
| */ |
| @Test |
| public void testFillSpecialException2() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| doOnePut(P, 1, "key1"); |
| doOnePut(P, 2, "key2"); |
| doOneDestroy(P, 3, "key1"); |
| changeForceFullGII(R, false, true); |
| |
| // 3 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterCalculatedUnfinishedOps = |
| new Mycallback(GIITestHookType.AfterCalculatedUnfinishedOps, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterCalculatedUnfinishedOps); |
| } |
| }); |
| // 5 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterSavedReceivedRVV = |
| new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV); |
| } |
| }); |
| |
| closeCache(R); |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| waitForCallbackStarted(R, GIITestHookType.AfterCalculatedUnfinishedOps); |
| |
| doOneDestroy(P, 4, "key2"); |
| doOnePut(P, 5, "key1"); |
| R.invoke(() -> InitialImageOperation |
| .resetGIITestHook(GIITestHookType.AfterCalculatedUnfinishedOps, true)); |
| waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV); |
| R.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true)); |
| async3.join(MAX_WAIT); |
| waitForToVerifyRVV(R, memberP, 5, null, 0); // P's rvv=r5, gc=0 |
| verifyDeltaSizeFromStats(R, 2, 1); |
| changeForceFullGII(R, false, true); |
| changeForceFullGII(P, false, true); |
| } |
| |
| /* |
| * 1. after R5 becomes R4, regenrate another R5 2. verify stattic callback at provider 3. gii |
| * packing should wait for tombstone GC 4. gii packing should wait for other on-fly operations |
| */ |
| @Test |
| public void testHooks() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| VersionTag expect_tag = getVersionTag(R, "key5"); |
| |
| final long[] exceptionlist = {4, 5}; |
| R.invoke(() -> GIIDeltaDUnitTest.slowGII(exceptionlist)); |
| AsyncInvocation async1 = doOnePutAsync(R, 4, "key4"); |
| waitForToVerifyRVV(R, memberR, 4, null, 0); // P's rvv=r4, gc=0 |
| |
| AsyncInvocation async2 = doOneDestroyAsync(R, 5, "key5"); |
| waitForToVerifyRVV(R, memberR, 5, null, 0); // P's rvv=r5, gc=0 |
| |
| // P should have unfinished ops R4,R5, but they did not show up in exception list |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| |
| |
| // define test hooks |
| // 1 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myBeforeRequestRVV = |
| new Mycallback(GIITestHookType.BeforeRequestRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myBeforeRequestRVV); |
| } |
| }); |
| |
| // 2 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterRequestRVV = new Mycallback(GIITestHookType.AfterRequestRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterRequestRVV); |
| } |
| }); |
| |
| // 3 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterCalculatedUnfinishedOps = |
| new Mycallback(GIITestHookType.AfterCalculatedUnfinishedOps, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterCalculatedUnfinishedOps); |
| } |
| }); |
| |
| // 4 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myBeforeSavedReceivedRVV = |
| new Mycallback(GIITestHookType.BeforeSavedReceivedRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myBeforeSavedReceivedRVV); |
| } |
| }); |
| |
| // 5 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterSavedReceivedRVV = |
| new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV); |
| } |
| }); |
| |
| // 6 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterSentRequestImage = |
| new Mycallback(GIITestHookType.AfterSentRequestImage, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterSentRequestImage); |
| } |
| }); |
| |
| // 7 |
| P.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterReceivedRequestImage = |
| new Mycallback(GIITestHookType.AfterReceivedRequestImage, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterReceivedRequestImage); |
| } |
| }); |
| |
| // 8 |
| P.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myDuringPackingImage = |
| new Mycallback(GIITestHookType.DuringPackingImage, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myDuringPackingImage); |
| } |
| }); |
| |
| // 9 |
| P.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterSentImageReply = |
| new Mycallback(GIITestHookType.AfterSentImageReply, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterSentImageReply); |
| } |
| }); |
| |
| // 10 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterReceivedImageReply = |
| new Mycallback(GIITestHookType.AfterReceivedImageReply, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterReceivedImageReply); |
| } |
| }); |
| |
| // 11 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myDuringApplyDelta = |
| new Mycallback(GIITestHookType.DuringApplyDelta, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myDuringApplyDelta); |
| } |
| }); |
| |
| // 12 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myBeforeCleanExpiredTombstones = |
| new Mycallback(GIITestHookType.BeforeCleanExpiredTombstones, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myBeforeCleanExpiredTombstones); |
| } |
| }); |
| |
| // 13 |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterSavedRVVEnd = |
| new Mycallback(GIITestHookType.AfterSavedRVVEnd, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterSavedRVVEnd); |
| } |
| }); |
| |
| |
| // shutdown R and restart |
| waitForToVerifyRVV(R, memberP, 6, null, 0); // R's rvv=p6, gc=0 |
| waitForToVerifyRVV(R, memberR, 5, null, 0); // R's rvv=r3, gc=0 |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| |
| // let p7 to succeed |
| doOnePut(P, 7, "key1"); |
| |
| waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0 |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| // now P's rvv=P7,R3, R's RVV=P6,R5 |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| |
| // 1 |
| waitForCallbackStarted(R, GIITestHookType.BeforeRequestRVV); |
| R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.BeforeRequestRVV, true)); |
| // 2 |
| waitForCallbackStarted(R, GIITestHookType.AfterRequestRVV); |
| R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterRequestRVV, true)); |
| // 3 |
| waitForCallbackStarted(R, GIITestHookType.AfterCalculatedUnfinishedOps); |
| R.invoke(() -> InitialImageOperation |
| .resetGIITestHook(GIITestHookType.AfterCalculatedUnfinishedOps, true)); |
| // 4 |
| waitForCallbackStarted(R, GIITestHookType.BeforeSavedReceivedRVV); |
| R.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.BeforeSavedReceivedRVV, true)); |
| // 5 |
| waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV); |
| R.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true)); |
| |
| // 6 |
| waitForCallbackStarted(R, GIITestHookType.AfterSentRequestImage); |
| R.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSentRequestImage, true)); |
| |
| // 7 |
| waitForCallbackStarted(P, GIITestHookType.AfterReceivedRequestImage); |
| P.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterReceivedRequestImage, |
| true)); |
| // 8 |
| waitForCallbackStarted(P, GIITestHookType.DuringPackingImage); |
| P.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.DuringPackingImage, true)); |
| // 9 |
| waitForCallbackStarted(P, GIITestHookType.AfterSentImageReply); |
| P.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSentImageReply, true)); |
| |
| // 10 |
| waitForCallbackStarted(R, GIITestHookType.AfterReceivedImageReply); |
| R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterReceivedImageReply, |
| true)); |
| // 11 |
| waitForCallbackStarted(R, GIITestHookType.DuringApplyDelta); |
| R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.DuringApplyDelta, true)); |
| // 12 |
| waitForCallbackStarted(R, GIITestHookType.BeforeCleanExpiredTombstones); |
| R.invoke(() -> InitialImageOperation |
| .resetGIITestHook(GIITestHookType.BeforeCleanExpiredTombstones, true)); |
| // 13 |
| waitForCallbackStarted(R, GIITestHookType.AfterSavedRVVEnd); |
| R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedRVVEnd, true)); |
| |
| async3.join(MAX_WAIT); |
| |
| waitForToVerifyRVV(R, memberP, 7, null, 0); // P's rvv=p7, gc=0 |
| waitForToVerifyRVV(R, memberR, 3, exceptionlist, 0); // P's rvv=r3, gc=0 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same |
| |
| // full gii chunk has 4 entries: key1,2(tombstone),3,5 |
| // delta gii has 2 entry: p7 (key1) and (key5(T) which is unfinished operation) |
| verifyDeltaSizeFromStats(R, 2, 1); |
| // verify unfinished op for key5 is revoked |
| waitToVerifyKey(R, "key5", generateValue(R)); |
| VersionTag tag = getVersionTag(R, "key5"); |
| assertTrue(expect_tag.equals(tag)); |
| |
| // P will receive R6 and have exception R6(3-6) |
| doOnePut(R, 6, "key1"); // r6 will pass |
| waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0 |
| |
| P.invoke(() -> InitialImageOperation.resetAllGIITestHooks()); |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. create some exception list. Then shutdown R. Before |
| * GII, P's RVV is P7,R6(3-6), RVVGC is P0,R0; R's RVV is P3,R6, RVVGC is P0,R0 vm1 becomes |
| * offline then restarts. Use testHook to pause the GII, then do tombstone GC at P only. The |
| * deltaGII should send correct tombstonedelta to R, revoke unfinished opeation R4,R5 |
| * |
| * There's member T doing GII from P at the same time. |
| * |
| * In this test, GII thread will get the GIILock before tombstone GC, so tombstone GC should wait |
| * for all GIIs to finish |
| */ |
| @Test |
| public void testTombstoneGCInMiddleOfGII() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| Host host = getHost(0); |
| VM T = host.getVM(2); |
| createDistributedRegion(T); |
| final DiskStoreID memberT = getMemberID(T); |
| closeCache(T); |
| |
| assertEquals(0, SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(3); |
| waitForToVerifyRVV(P, memberP, 3, null, 0); // P's rvv=p3, gc=0 |
| VersionTag expect_tag = getVersionTag(R, "key5"); |
| |
| createUnfinishedOperationsR4R5(); |
| waitForToVerifyRVV(R, memberP, 3, null, 0); // R's rvv=p3, gc=0 |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| |
| // p4-7 only apply at P |
| doOneDestroy(P, 4, "key2"); |
| doOnePut(P, 5, "key1"); |
| doOnePut(P, 6, "key3"); |
| doOnePut(P, 7, "key1"); |
| |
| // add test hook |
| // 8 |
| P.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myDuringPackingImage = |
| new Mycallback(DuringPackingImage, REGION_NAME); |
| setGIITestHook(myDuringPackingImage); |
| } |
| }); |
| |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| |
| // restart R and gii, it will be blocked at test hook |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| // restart R and gii, it will be blocked at test hook |
| AsyncInvocation async4 = createDistributedRegionAsync(T); |
| // 8 |
| waitForCallbackStarted(P, DuringPackingImage); |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| int count = getDeltaGIICount(P); |
| return (count == 2); |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| |
| GeodeAwaitility.await().untilAsserted(ev); |
| int count = getDeltaGIICount(P); |
| assertEquals(2, count); |
| |
| // force tombstone GC to let RVVGC to become P4:R0, but R already sent its old RVV/RVVGC over |
| // this tombstone GC happens AFTER GII thread got the GIILock, so it will be ignored since GII |
| // is ongoing |
| changeTombstoneTimout(R, MAX_WAIT); |
| changeTombstoneTimout(P, MAX_WAIT); |
| changeTombstoneTimout(T, MAX_WAIT); |
| pause((int) MAX_WAIT); |
| forceGC(P, 2); |
| waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0 |
| |
| // let GII continue |
| P.invoke( |
| () -> resetGIITestHook(DuringPackingImage, false)); |
| P.invoke( |
| () -> resetGIITestHook(DuringPackingImage, true)); |
| |
| WaitCriterion ev2 = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| int count = getDeltaGIICount(P); |
| return (count == 0); |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev2); |
| count = getDeltaGIICount(P); |
| assertEquals(0, count); |
| verifyTombstoneExist(P, "key2", true, true); // expect key2 is still tombstone during and after |
| // GIIs |
| verifyTombstoneExist(R, "key2", true, true); // expect key2 is still tombstone during and after |
| // GIIs |
| verifyTombstoneExist(T, "key2", true, true); // expect key2 is still tombstone during and after |
| // GIIs |
| |
| forceGC(P, 1); // trigger to GC the tombstones in expired queue |
| async3.join(MAX_WAIT); |
| async4.join(MAX_WAIT); |
| |
| // after GII, tombstone GC happened |
| waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p6, gc=4 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0 |
| |
| waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4 |
| waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0 |
| |
| verifyTombstoneExist(P, "key2", false, true); // expect tombstone key2 is GCed at P |
| verifyTombstoneExist(R, "key2", false, true); // expect tombstone key2 is GCed at R |
| verifyTombstoneExist(T, "key2", false, true); // T got everything from P |
| |
| // do a put from T |
| doOnePut(T, 1, "key1"); |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| RegionVersionVector t_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); |
| assertSameRVV(t_rvv, r_rvv); |
| |
| // If fullGII, the key size in gii chunk is 4, i.e. key1,key3,key5,key2 is a tombstone. |
| // If delta GII, it should be 4, (key1, key2, key3) and (key5(T) which is unfinished operation) |
| verifyDeltaSizeFromStats(R, 4, 1); |
| // verify unfinished op for key5 is revoked |
| waitToVerifyKey(R, "key5", generateValue(R)); |
| VersionTag tag = getVersionTag(R, "key5"); |
| assertTrue(expect_tag.equals(tag)); |
| // P.invoke(() -> InitialImageOperation.resetAllGIITestHooks()); |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. Let P and R have the same RVV and RVVGC: P7,R6, RVVGC |
| * is P0,R0. vm1 becomes offline then restarts. Use testHook to pause the GII, then do tombstone |
| * GC triggered by expireBatch (not by forceGC) at R only. The tombstone GC will be executed at R, |
| * but igored at P. The deltaGII should send nothing to R since the RVVs are the same. So after |
| * GII, P and R will have different tombstone number. But P's tombstones should be expired. |
| */ |
| @Test |
| public void testExpiredTombstoneSkippedAtProviderOnly() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| |
| assertEquals(0, SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| |
| // let r4,r5,r6 to succeed |
| doOnePut(R, 4, "key4"); |
| doOneDestroy(R, 5, "key5"); |
| doOnePut(R, 6, "key1"); |
| |
| waitForToVerifyRVV(R, memberP, 6, null, 0); // P's rvv=p6, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| // now the rvv and rvvgc at P and R should be the same |
| |
| // save R's rvv in byte array, check if it will be fullGII |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| // shutdown R and restart |
| closeCache(R); |
| |
| // let p7 to succeed |
| doOnePut(P, 7, "key1"); |
| |
| waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| |
| // add test hook |
| P.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myDuringPackingImage = |
| new Mycallback(DuringPackingImage, REGION_NAME); |
| setGIITestHook(myDuringPackingImage); |
| } |
| }); |
| |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| |
| // restart R and gii, it will be blocked at test hook |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| // 8 |
| waitForCallbackStarted(P, DuringPackingImage); |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| int count = getDeltaGIICount(P); |
| return (count == 1); |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| |
| GeodeAwaitility.await().untilAsserted(ev); |
| int count = getDeltaGIICount(P); |
| assertEquals(1, count); |
| |
| // let tombstone expired at R to trigger tombstoneGC. |
| // Wait for tombstone is GCed at R, but still exists in P |
| changeTombstoneTimout(R, MAX_WAIT); |
| changeTombstoneTimout(P, MAX_WAIT); |
| pause((int) MAX_WAIT); |
| forceGC(R, 3); |
| forceGC(P, 3); |
| |
| // let GII continue |
| P.invoke( |
| () -> resetGIITestHook(DuringPackingImage, true)); |
| async3.join(MAX_WAIT * 2); |
| count = getDeltaGIICount(P); |
| assertEquals(0, count); |
| verifyDeltaSizeFromStats(R, 1, 1); // deltaGII, key1 in delta |
| |
| // tombstone key2, key5 should be GCed at R |
| verifyTombstoneExist(R, "key2", false, false); |
| verifyTombstoneExist(R, "key5", false, false); |
| |
| // tombstone key2, key5 should still exist and expired at P |
| verifyTombstoneExist(P, "key2", true, true); |
| verifyTombstoneExist(P, "key5", true, true); |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| out.println("GGG:p_rvv=" + p_rvv.fullToString() + ":r_rvv=" + r_rvv.fullToString()); |
| |
| waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4 |
| waitForToVerifyRVV(R, memberR, 6, null, 5); // R's rvv=r6, gc=5 |
| waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. Each does a few operations to make RVV=P7,R6, |
| * RVVGC=P4,R0 for both members. vm1 becomes offline then restarts. The deltaGII should only |
| * exchange RVV. No need to send data from vm0 to vm1. |
| */ |
| @Test |
| public void testRequesterHasHigherRVVGC() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| |
| // let r4,r5,r6 to succeed |
| doOnePut(R, 4, "key4"); |
| doOneDestroy(R, 5, "key5"); |
| doOnePut(R, 6, "key1"); |
| |
| // let P pretends to be doing GII, to skip gcTombstone |
| forceAddGIICount(P); |
| changeTombstoneTimout(R, MAX_WAIT); |
| changeTombstoneTimout(P, MAX_WAIT); |
| Wait.pause((int) MAX_WAIT); |
| forceGC(R, 3); |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| waitForToVerifyRVV(R, memberP, 6, null, 4); // P's rvv=p6, gc=4 |
| waitForToVerifyRVV(R, memberR, 6, null, 5); // R's rvv=r6, gc=5 |
| verifyTombstoneExist(R, "key5", false, false); |
| verifyTombstoneExist(P, "key5", true, true); |
| |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| // let p7 to succeed |
| doOnePut(P, 7, "key1"); |
| |
| // shutdown R and restart |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| createDistributedRegion(R); |
| waitForToVerifyRVV(R, memberP, 7, null, 4); // P's rvv=p7, gc=4 |
| waitForToVerifyRVV(R, memberR, 6, null, 5); // P's rvv=r6, gc=5 |
| waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=r7, gc still 0 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc still 0 |
| |
| // If fullGII, the key size in gii chunk is 5, i.e. key1,key2(T),key3,key4,key5(T). |
| // If deltaGII, the key size is 1, i.e. P7 (key1) |
| verifyDeltaSizeFromStats(R, 1, 1); |
| verifyTombstoneExist(R, "key5", false, false); |
| verifyTombstoneExist(P, "key5", true, true); |
| } |
| |
| /** |
| * P and R are peers, each holds a DR. Each does a few operations to make RVV=P7,R6, RVVGC=P0,R0 |
| * for both members. P8 is clear() operation. After that, R offline. Run P9 is a put. Restart R. R |
| * will do deltaGII to get P9 as delta |
| */ |
| @Test |
| public void testDeltaGIIAfterClear() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| // let r4,r5,r6 to succeed |
| doOnePut(R, 4, "key4"); |
| doOneDestroy(R, 5, "key5"); |
| doOnePut(R, 6, "key1"); |
| |
| doOnePut(P, 7, "key1"); |
| |
| waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| waitForToVerifyRVV(R, memberP, 7, null, 0); // R's rvv=P7, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0 |
| |
| // Note: since R is still online, clear will do flush message which will be blocked by the |
| // test CDL (to create unfinished operation). So in this test, no exception |
| doOneClear(P, 8); |
| // clear() increased P's version with 1 to P8 |
| // after clear, P and R's RVVGC == RVV |
| waitForToVerifyRVV(P, memberP, 8, null, 8); // P's rvv=p8, gc=8 |
| waitForToVerifyRVV(P, memberR, 6, null, 6); // P's rvv=r6, gc=6 |
| waitForToVerifyRVV(R, memberP, 8, null, 8); // R's rvv=p8, gc=8 |
| waitForToVerifyRVV(R, memberR, 6, null, 6); // R's rvv=r6, gc=6 |
| |
| // shutdown R |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| |
| // do a put at P to get some delta |
| doOnePut(P, 9, "key3"); |
| waitForToVerifyRVV(P, memberP, 9, null, 8); |
| waitForToVerifyRVV(P, memberR, 6, null, 6); |
| |
| // restart R to deltaGII |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| |
| // shutdown P and restart |
| closeCache(P); |
| createDistributedRegion(P); |
| waitForToVerifyRVV(P, memberP, 9, null, 8); |
| waitForToVerifyRVV(P, memberR, 6, null, 6); |
| |
| createDistributedRegion(R); |
| waitForToVerifyRVV(R, memberP, 9, null, 8); |
| waitForToVerifyRVV(R, memberR, 6, null, 6); |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same |
| |
| verifyDeltaSizeFromStats(R, 1, 1); |
| } |
| |
| /** |
| * P and R are peers, each holds a DR. Each does a few operations to make RVV=P6,R6, RVVGC=P0,R0 |
| * for both members. R off line, then run P7. Restart R. It will trigger deltaGII to chunk entry |
| * P7(key1). After that, do clear(). Make sure R should not contain key1 after GII. |
| */ |
| @Test |
| public void testClearAfterChunkEntries() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| // let r4,r5,r6 to succeed |
| doOnePut(R, 4, "key4"); |
| doOneDestroy(R, 5, "key5"); |
| doOnePut(R, 6, "key1"); |
| |
| waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| waitForToVerifyRVV(R, memberP, 6, null, 0); // R's rvv=P6, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0 |
| |
| // set tesk hook |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterReceivedImageReply = |
| new Mycallback(GIITestHookType.AfterReceivedImageReply, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterReceivedImageReply); |
| } |
| }); |
| |
| // shutdown R |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| |
| // key1 will be the delta |
| doOnePut(P, 7, "key1"); |
| |
| // retart R |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false); |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| |
| // when chunk arrived, do clear() |
| waitForCallbackStarted(R, GIITestHookType.AfterReceivedImageReply); |
| doOneClear(P, 8); |
| |
| R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterReceivedImageReply, |
| true)); |
| async3.getResult(MAX_WAIT); |
| |
| // clear() increased P's version with 1 to P8 |
| // after clear, P and R's RVVGC == RVV |
| waitForToVerifyRVV(P, memberP, 8, null, 8); // P's rvv=r8, gc=8 |
| waitForToVerifyRVV(P, memberR, 6, null, 6); // P's rvv=r6, gc=6 |
| |
| // retart R again to do fullGII |
| closeCache(R); |
| createDistributedRegion(R); |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same |
| |
| waitForToVerifyRVV(R, memberP, 8, null, 8); // R's rvv=r8, gc=8 |
| waitForToVerifyRVV(R, memberR, 6, null, 6); // R's rvv=r6, gc=6 |
| waitToVerifyKey(P, "key1", null); |
| waitToVerifyKey(R, "key1", null); |
| verifyDeltaSizeFromStats(R, 0, 1); |
| } |
| |
| /** |
| * P and R are peers, each holds a DR. Each does a few operations to make RVV=P6,R6, RVVGC=P0,R0 |
| * for both members. R off line, then run P7. Restart R. When P's RVV arrives, do clear(). It |
| * should trigger fullGII. Make sure R should not contain key1 after GII. |
| */ |
| @Test |
| public void testClearAfterSavedRVV() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| // let r4,r5,r6 to succeed |
| doOnePut(R, 4, "key4"); |
| doOneDestroy(R, 5, "key5"); |
| doOnePut(R, 6, "key1"); |
| |
| waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| waitForToVerifyRVV(R, memberP, 6, null, 0); // R's rvv=P6, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0 |
| |
| // set tesk hook |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterSavedReceivedRVV = |
| new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV); |
| } |
| }); |
| |
| // shutdown R |
| closeCache(R); |
| |
| // key1 will be the delta |
| doOnePut(P, 7, "key1"); |
| |
| // retart R |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| |
| // when chunk arrived, do clear() |
| waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV); |
| doOneClear(P, 8); |
| |
| R.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true)); |
| async3.join(MAX_WAIT); |
| |
| // clear() increased P's version with 1 to P8 |
| // after clear, P and R's RVVGC == RVV |
| waitForToVerifyRVV(P, memberP, 8, null, 8); // P's rvv=r8, gc=8 |
| waitForToVerifyRVV(P, memberR, 6, null, 6); // P's rvv=r6, gc=6 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same |
| |
| waitForToVerifyRVV(R, memberP, 8, null, 8); // R's rvv=r8, gc=8 |
| waitForToVerifyRVV(R, memberR, 6, null, 6); // R's rvv=r6, gc=6 |
| waitToVerifyKey(P, "key1", null); |
| waitToVerifyKey(R, "key1", null); |
| verifyDeltaSizeFromStats(R, 0, 0); |
| } |
| |
| /** |
| * P and R are peers, each holds a DR. Each does a few operations to make RVV=P7,R6, RVVGC=P0,R0 |
| * for both members. R offline, then P8 is clear() operation. Run P9 is a put. Restart R. R will |
| * do fullGII since R missed a clear |
| */ |
| @Test |
| public void testFullGIIAfterClear() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| createUnfinishedOperationsR4R5(); |
| |
| doOnePut(P, 7, "key1"); |
| |
| waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p6, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0 |
| waitForToVerifyRVV(R, memberP, 7, null, 0); // R's rvv=P7, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0 |
| |
| // shutdown R before clear |
| byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME); |
| closeCache(R); |
| |
| doOneClear(P, 8); |
| // clear() increased P's version with 1 to P8 |
| // after clear, P and R's RVVGC == RVV, no more exception |
| waitForToVerifyRVV(P, memberP, 8, null, 8); // P's rvv=r6, gc=8 |
| waitForToVerifyRVV(P, memberR, 6, null, 6); // P's rvv=r6, gc=6 |
| |
| // do a put at P to get some delta |
| doOnePut(P, 9, "key3"); |
| waitForToVerifyRVV(P, memberP, 9, null, 8); |
| waitForToVerifyRVV(P, memberR, 6, null, 6); |
| |
| // restart R to deltaGII |
| checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true); |
| createDistributedRegion(R); |
| waitForToVerifyRVV(R, memberP, 9, null, 8); |
| waitForToVerifyRVV(R, memberR, 6, null, 6); |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same |
| |
| verifyDeltaSizeFromStats(R, 1, 0); |
| } |
| |
| /** |
| * vm0 and vm1 are peers, each holds a DR. create some exception list. Before GII, P's RVV is |
| * P7,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0 By changing MAX_UNFINISHED_OPERATIONS to be |
| * 1, 2. It should be fullGII then deltaGII. |
| */ |
| @Test |
| public void testFullGIITriggeredByTooManyUnfinishedOps() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| |
| // force tombstone GC to let RVVGC to become P4:R0 |
| forceGC(P, 2); |
| waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4 |
| waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0 |
| |
| createUnfinishedOperationsR4R5(); |
| |
| // now P's cache still only has key1, key3, key5 |
| closeCache(R); |
| // p7 only apply at P |
| doOnePut(P, 7, "key1"); |
| |
| // restart and gii, it should be fullGII because number of unfinished operations exceeds limit |
| changeUnfinishedOperationLimit(R, 1); |
| createDistributedRegion(R); |
| // If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed. |
| // If delta GII, the key size should be 1, i.e. P7(key1) |
| verifyDeltaSizeFromStats(R, 3, 0); |
| } |
| |
| /** |
| * P and R are peers, each holds a DR. Each does a few operations to make RVV=P6,R6, RVVGC=P0,R0 |
| * for both members. R off line, then run P7. Restart R. When P's RVV arrives, restart R. It |
| * should trigger fullGII. |
| */ |
| @Test |
| public void testRestartWithOnlyGIIBegion() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| final long[] exceptionlist = {4, 5}; |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| // let r4,r5,r6 to succeed |
| doOnePut(R, 4, "key4"); |
| doOneDestroy(R, 5, "key5"); |
| doOnePut(R, 6, "key1"); |
| |
| waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| waitForToVerifyRVV(R, memberP, 6, null, 0); // R's rvv=P6, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0 |
| |
| // set tesk hook |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Mycallback myAfterSavedReceivedRVV = |
| new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV); |
| } |
| }); |
| |
| // shutdown R |
| closeCache(R); |
| |
| // key1 will be the delta |
| doOnePut(P, 7, "key1"); |
| |
| // retart R |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| |
| // when chunk arrived, do clear() |
| waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV); |
| |
| |
| // kill and restart R |
| closeCache(R); |
| async3.join(MAX_WAIT); |
| R.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true)); |
| createDistributedRegion(R); |
| |
| waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=r8, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| |
| RegionVersionVector p_rvv = getRVV(P); |
| RegionVersionVector r_rvv = getRVV(R); |
| assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same |
| |
| waitForToVerifyRVV(R, memberP, 7, null, 0); // R's rvv=r7, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0 |
| |
| // In fullGII, the key size in gii chunk is 2. They are: key1, key2, key3, key4, key5 |
| verifyDeltaSizeFromStats(R, 5, 0); |
| } |
| |
| /** |
| * Test the case where a member has an untrusted RVV and still initializes from the local data. |
| * See bug 48066 |
| * |
| */ |
| @Test |
| public void testRecoverFromUntrustedRVV() throws Throwable { |
| prepareForEachTest(); |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| // let r4,r5,r6 to succeed |
| doOnePut(R, 4, "key4"); |
| doOneDestroy(R, 5, "key5"); |
| doOnePut(R, 6, "key1"); |
| |
| waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0 |
| waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| waitForToVerifyRVV(R, memberP, 6, null, 0); // R's rvv=P6, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0 |
| |
| // set tesk hook |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| // Add hooks before and after receiving the RVV |
| Mycallback myBeforeSavedReceivedRVV = |
| new Mycallback(GIITestHookType.BeforeSavedReceivedRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myBeforeSavedReceivedRVV); |
| Mycallback myAfterSavedReceivedRVV = |
| new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV); |
| } |
| }); |
| |
| // shutdown R |
| closeCache(R); |
| |
| |
| |
| // retart R |
| AsyncInvocation async3 = createDistributedRegionAsync(R); |
| |
| // when chunk arrived, do clear() |
| waitForCallbackStarted(R, GIITestHookType.BeforeSavedReceivedRVV); |
| |
| // Before R saves the RVV, do a put. This will be recording in Rs region |
| // and RVV, but it will be wiped out in the RVV when R applies the RVV |
| // from P. |
| doOnePut(P, 7, "key1"); |
| |
| R.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.BeforeSavedReceivedRVV, true)); |
| |
| // Wait until the new RVV is applied |
| waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV); |
| |
| // destroy the region on P (which will force R to recover with it's own |
| // data |
| destroyRegion(P); |
| |
| // Allow the GII to continue. |
| R.invoke( |
| () -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true)); |
| |
| async3.join(MAX_WAIT); |
| |
| // createDistributedRegion(R); |
| |
| // Make sure that Rs RVV now reflects the update from P |
| waitForToVerifyRVV(R, memberP, 7, null, 0); // P's rvv=r8, gc=0 |
| waitForToVerifyRVV(R, memberR, 6, null, 0); // P's rvv=r6, gc=0 |
| } |
| |
| /** |
| * Test case to make sure that if a tombstone GC occurs during a full GII, we still have the |
| * correct RVV on the GII recipient at the end. |
| * |
| */ |
| @Test |
| public void testTombstoneGCDuringFullGII() throws Throwable { |
| prepareForEachTest(); |
| |
| // Create the region in 1 more VM to to a tombstone GC. |
| VM vm2 = Host.getHost(0).getVM(2); |
| createDistributedRegion(vm2); |
| |
| final DiskStoreID memberP = getMemberID(P); |
| final DiskStoreID memberR = getMemberID(R); |
| |
| assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS); |
| prepareCommonTestData(6); |
| // All members should have "key5" at this point |
| |
| // shutdown R |
| closeCache(R); |
| |
| final VM vmR = R; |
| |
| // Destroy key5, this will leave a tombstone |
| doOneDestroy(P, 7, "key5"); |
| |
| // Set tesk hook so that R will pause GII after getting the RVV |
| R.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| // Add hooks before and after receiving the RVV |
| Mycallback myAfterSavedReceivedRVV = |
| new Mycallback(GIITestHookType.AfterCalculatedUnfinishedOps, REGION_NAME); |
| InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV); |
| } |
| }); |
| |
| |
| // Set a trigger in vm2 so that it will start up R after determining |
| // the recipients for a tombstone GC message. vm2 will wait until |
| // R has already received the RVV before sending the message. |
| vm2.invoke(new SerializableRunnable() { |
| |
| @Override |
| public void run() { |
| DistributionMessageObserver.setInstance(new DistributionMessageObserver() { |
| |
| @Override |
| public void beforeSendMessage(ClusterDistributionManager dm, |
| DistributionMessage message) { |
| if (message instanceof TombstoneMessage |
| && ((TombstoneMessage) message).regionPath.contains(REGION_NAME)) { |
| System.err.println("DAN DEBUG about to send tombstone message, starting up R - " |
| + message.getSender()); |
| AsyncInvocation async3 = createDistributedRegionAsync(vmR); |
| |
| // Wait for R to finish requesting the RVV before letting the tombstone GC proceeed. |
| waitForCallbackStarted(vmR, GIITestHookType.AfterCalculatedUnfinishedOps); |
| System.err.println("DAN DEBUG R has received the RVV, sending tombstone message"); |
| DistributionMessageObserver.setInstance(null); |
| } |
| } |
| }); |
| } |
| }); |
| |
| P.invoke(new SerializableRunnable() { |
| |
| @Override |
| public void run() { |
| DistributionMessageObserver.setInstance(new DistributionMessageObserver() { |
| @Override |
| public void afterProcessMessage(ClusterDistributionManager dm, |
| DistributionMessage message) { |
| if (message instanceof TombstoneMessage |
| && ((TombstoneMessage) message).regionPath.contains(REGION_NAME)) { |
| System.err.println( |
| "DAN DEBUG P has processed the tombstone message, allowing R to proceed with the GII"); |
| vmR.invoke(() -> InitialImageOperation |
| .resetGIITestHook(GIITestHookType.AfterCalculatedUnfinishedOps, true)); |
| DistributionMessageObserver.setInstance(null); |
| } |
| } |
| }); |
| } |
| }); |
| |
| // Force tombstone GC, this will trigger the R to be started, etc. |
| vm2.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| GemFireCacheImpl cache = (GemFireCacheImpl) getCache(); |
| try { |
| cache.getTombstoneService().forceBatchExpirationForTests(20); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| }); |
| |
| |
| |
| // Wait for P to perform the tombstone GC |
| waitForToVerifyRVV(P, memberP, 7, null, 7); |
| |
| System.err |
| .println("DAN DEBUG P has finished the tombstone GC, waiting for R to get the correct RVV"); |
| |
| // Make sure that Rs RVV now reflects the update from P |
| waitForToVerifyRVV(R, memberP, 7, null, 7); // P's rvv=r7, gc=7 |
| } |
| |
| /** |
| * Returns region attributes for a <code>GLOBAL</code> region |
| */ |
| protected RegionAttributes getRegionAttributes() { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| factory.setConcurrencyChecksEnabled(true); |
| return factory.create(); |
| } |
| |
| protected void createDistributedRegion(VM vm) { |
| AsyncInvocation future = createDistributedRegionAsync(vm); |
| try { |
| future.join(MAX_WAIT); |
| } catch (InterruptedException e) { |
| Assert.fail("Create region is interrupted", e); |
| } |
| if (future.isAlive()) { |
| fail("Region not created within" + MAX_WAIT); |
| } |
| if (future.exceptionOccurred()) { |
| throw new RuntimeException(future.getException()); |
| } |
| } |
| |
| protected AsyncInvocation createDistributedRegionAsync(VM vm) { |
| SerializableRunnable createRegion = new SerializableRunnable("Create Region") { |
| @Override |
| public void run() { |
| try { |
| String value = |
| System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close"); |
| assertNull(value); |
| RegionFactory f = getCache().createRegionFactory(getRegionAttributes()); |
| // CCRegion = (LocalRegion)f.create(REGION_NAME); |
| LocalRegion lr = (LocalRegion) f.create(REGION_NAME); |
| LogWriterUtils.getLogWriter() |
| .info("In createDistributedRegion, using hydra.getLogWriter()"); |
| LogWriterUtils.getLogWriter() |
| .fine("Unfinished Op limit=" + InitialImageOperation.MAXIMUM_UNFINISHED_OPERATIONS); |
| } catch (CacheException ex) { |
| Assert.fail("While creating region", ex); |
| } |
| } |
| }; |
| return vm.invokeAsync(createRegion); |
| } |
| |
| protected void closeCache(VM vm) { |
| SerializableRunnable close = new SerializableRunnable() { |
| @Override |
| public void run() { |
| try { |
| Cache cache = getCache(); |
| System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close", "true"); |
| cache.close(); |
| } finally { |
| System.getProperties().remove(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close"); |
| } |
| } |
| }; |
| vm.invoke(close); |
| } |
| |
| protected void destroyRegion(VM vm) { |
| SerializableRunnable destroy = new SerializableRunnable() { |
| @Override |
| public void run() { |
| LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME); |
| lr.localDestroyRegion(); |
| } |
| }; |
| vm.invoke(destroy); |
| } |
| |
| protected void changeUnfinishedOperationLimit(VM vm, final int value) { |
| SerializableRunnable change = new SerializableRunnable() { |
| @Override |
| public void run() { |
| InitialImageOperation.MAXIMUM_UNFINISHED_OPERATIONS = value; |
| } |
| }; |
| vm.invoke(change); |
| } |
| |
| protected void changeTombstoneTimout(VM vm, final long value) { |
| SerializableRunnable change = new SerializableRunnable() { |
| @Override |
| public void run() { |
| TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = value; |
| } |
| }; |
| vm.invoke(change); |
| } |
| |
| protected void changeForceFullGII(VM vm, final boolean value, final boolean checkOnly) { |
| SerializableRunnable change = new SerializableRunnable() { |
| @Override |
| public void run() { |
| if (checkOnly) { |
| assertEquals(value, InitialImageOperation.FORCE_FULL_GII); |
| } else { |
| InitialImageOperation.FORCE_FULL_GII = value; |
| } |
| } |
| }; |
| vm.invoke(change); |
| } |
| |
| protected void removeSystemPropertiesInVM(VM vm, final String prop) { |
| SerializableRunnable change = new SerializableRunnable() { |
| @Override |
| public void run() { |
| LogWriterUtils.getLogWriter() |
| .info("Current prop setting: " + prop + "=" + System.getProperty(prop)); |
| System.getProperties().remove(prop); |
| LogWriterUtils.getLogWriter().info(prop + "=" + System.getProperty(prop)); |
| } |
| }; |
| vm.invoke(change); |
| } |
| |
| protected void verifyDeltaSizeFromStats(VM vm, final int expectedKeyNum, |
| final int expectedDeltaGIINum) { |
| SerializableRunnable verify = new SerializableRunnable() { |
| @Override |
| public void run() { |
| Cache cache = getCache(); |
| // verify from CachePerfStats that certain amount of keys in delta |
| LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME); |
| CachePerfStats stats = lr.getRegionPerfStats(); |
| |
| // we saved GII completed count in RegionPerfStats only |
| int size = stats.getGetInitialImageKeysReceived(); |
| cache.getLogger().info("Delta contains: " + size + " keys"); |
| assertEquals(expectedKeyNum, size); |
| |
| int num = stats.getDeltaGetInitialImagesCompleted(); |
| cache.getLogger().info("Delta GII completed: " + num + " times"); |
| assertEquals(expectedDeltaGIINum, num); |
| } |
| }; |
| vm.invoke(verify); |
| } |
| |
| // P should have smaller diskstore ID than R's |
| protected void assignVMsToPandR(final VM vm0, final VM vm1) { |
| DiskStoreID dsid0 = getMemberID(vm0); |
| DiskStoreID dsid1 = getMemberID(vm1); |
| int compare = dsid0.compareTo(dsid1); |
| LogWriterUtils.getLogWriter().info("Before assignVMsToPandR, dsid0 is " + dsid0 + ",dsid1 is " |
| + dsid1 + ",compare=" + compare); |
| if (compare > 0) { |
| P = vm0; |
| R = vm1; |
| } else { |
| P = vm1; |
| R = vm0; |
| } |
| LogWriterUtils.getLogWriter().info("After assignVMsToPandR, P is " + P.getId() + "; R is " |
| + R.getId() + " for region " + REGION_NAME); |
| } |
| |
| private DiskStoreID getMemberID(VM vm) { |
| SerializableCallable getDiskStoreID = new SerializableCallable("get DiskStoreID as member id") { |
| @Override |
| public Object call() { |
| LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME); |
| assertTrue(lr != null && lr.getDiskStore() != null); |
| DiskStoreID dsid = lr.getDiskStore().getDiskStoreID(); |
| return dsid; |
| } |
| }; |
| return (DiskStoreID) vm.invoke(getDiskStoreID); |
| } |
| |
| private void forceGC(VM vm, final int count) { |
| vm.invoke(new SerializableCallable("force GC") { |
| @Override |
| public Object call() throws Exception { |
| ((GemFireCacheImpl) getCache()).getTombstoneService().forceBatchExpirationForTests(count); |
| return null; |
| } |
| }); |
| } |
| |
| private void forceAddGIICount(VM vm) { |
| vm.invoke(new SerializableCallable("force to add gii count") { |
| @Override |
| public Object call() throws Exception { |
| ((GemFireCacheImpl) getCache()).getTombstoneService().incrementGCBlockCount(); |
| return null; |
| } |
| }); |
| } |
| |
| private void assertDeltaGIICountBeZero(VM vm) { |
| vm.invoke(new SerializableCallable("assert progressingDeltaGIICount == 0") { |
| @Override |
| public Object call() throws Exception { |
| int count = ((GemFireCacheImpl) getCache()).getTombstoneService().getGCBlockCount(); |
| assertEquals(0, count); |
| return null; |
| } |
| }); |
| } |
| |
| public void waitForToVerifyRVV(final VM vm, final DiskStoreID member, |
| final long expectedRegionVersion, final long[] exceptionList, final long expectedGCVersion) { |
| SerializableRunnable waitForVerifyRVV = new SerializableRunnable() { |
| |
| private boolean verifyExceptionList(final DiskStoreID member, final long regionversion, |
| final RegionVersionVector rvv, final long[] exceptionList) { |
| boolean exceptionListVerified = true; |
| if (exceptionList != null) { |
| for (long i : exceptionList) { |
| exceptionListVerified = !rvv.contains(member, i); |
| if (!exceptionListVerified) { |
| LogWriterUtils.getLogWriter().finer("DeltaGII:missing exception " + i + ":" + rvv); |
| break; |
| } |
| } |
| } else { |
| // expect no exceptionlist |
| for (long i = 1; i <= regionversion; i++) { |
| if (!rvv.contains(member, i)) { |
| exceptionListVerified = false; |
| LogWriterUtils.getLogWriter().finer("DeltaGII:unexpected exception " + i); |
| break; |
| } |
| } |
| } |
| return exceptionListVerified; |
| } |
| |
| @Override |
| public void run() { |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| RegionVersionVector rvv = ((LocalRegion) getCache().getRegion(REGION_NAME)) |
| .getVersionVector().getCloneForTransmission(); |
| long regionversion = getRegionVersionForMember(rvv, member, false); |
| long gcversion = getRegionVersionForMember(rvv, member, true); |
| |
| boolean exceptionListVerified = |
| verifyExceptionList(member, regionversion, rvv, exceptionList); |
| getLogWriter() |
| .info("DeltaGII:expected:" + expectedRegionVersion + ":" + expectedGCVersion); |
| getLogWriter().info("DeltaGII:actual:" + regionversion + ":" + gcversion |
| + ":" + exceptionListVerified + ":" + rvv); |
| |
| boolean match = true; |
| if (expectedRegionVersion != -1) { |
| match = match && (regionversion == expectedRegionVersion); |
| } |
| if (expectedGCVersion != -1) { |
| match = match && (gcversion == expectedGCVersion); |
| } |
| return match && exceptionListVerified; |
| } |
| |
| @Override |
| public String description() { |
| RegionVersionVector rvv = ((LocalRegion) getCache().getRegion(REGION_NAME)) |
| .getVersionVector().getCloneForTransmission(); |
| long regionversion = getRegionVersionForMember(rvv, member, false); |
| long gcversion = getRegionVersionForMember(rvv, member, true); |
| return "expected (rv" + expectedRegionVersion + ", gc" + expectedGCVersion + ")" |
| + " got (rv" + regionversion + ", gc" + gcversion + ")"; |
| } |
| }; |
| |
| GeodeAwaitility.await().untilAsserted(ev); |
| RegionVersionVector rvv = ((LocalRegion) getCache().getRegion(REGION_NAME)) |
| .getVersionVector().getCloneForTransmission(); |
| long regionversion = getRegionVersionForMember(rvv, member, false); |
| long gcversion = getRegionVersionForMember(rvv, member, true); |
| if (expectedRegionVersion != -1) { |
| assertEquals(expectedRegionVersion, regionversion); |
| } |
| if (expectedGCVersion != -1) { |
| assertEquals(expectedGCVersion, gcversion); |
| } |
| boolean exceptionListVerified = |
| verifyExceptionList(member, regionversion, rvv, exceptionList); |
| assertTrue(exceptionListVerified); |
| } |
| }; |
| vm.invoke(waitForVerifyRVV); |
| } |
| |
| public void waitForCallbackStarted(final VM vm, final GIITestHookType callbacktype) { |
| SerializableRunnable waitForCallbackStarted = new SerializableRunnable() { |
| @Override |
| public void run() { |
| |
| final GIITestHook callback = |
| getGIITestHookForCheckingPurpose(callbacktype); |
| WaitCriterion ev = new WaitCriterion() { |
| |
| @Override |
| public boolean done() { |
| return (callback != null && callback.isRunning); |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| |
| GeodeAwaitility.await().untilAsserted(ev); |
| if (callback == null || !callback.isRunning) { |
| fail("GII tesk hook is not started yet"); |
| } |
| } |
| }; |
| vm.invoke(waitForCallbackStarted); |
| } |
| |
| private VersionTag getVersionTag(VM vm, final String key) { |
| SerializableCallable getVersionTag = new SerializableCallable("get version tag") { |
| @Override |
| public Object call() { |
| VersionTag tag = ((LocalRegion) getCache().getRegion(REGION_NAME)).getVersionTag(key); |
| return tag; |
| |
| } |
| }; |
| return (VersionTag) vm.invoke(getVersionTag); |
| } |
| |
| public void waitToVerifyKey(final VM vm, final String key, final String expect_value) { |
| SerializableRunnable waitToVerifyKey = new SerializableRunnable() { |
| @Override |
| public void run() { |
| |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| String value = (String) ((LocalRegion) getCache().getRegion(REGION_NAME)).get(key); |
| if (expect_value == null && value == null) { |
| return true; |
| } else { |
| return (value != null && value.equals(expect_value)); |
| } |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| |
| GeodeAwaitility.await().untilAsserted(ev); |
| String value = (String) ((LocalRegion) getCache().getRegion(REGION_NAME)).get(key); |
| assertEquals(expect_value, value); |
| } |
| }; |
| vm.invoke(waitToVerifyKey); |
| } |
| |
| protected byte[] getRVVByteArray(VM vm, final String regionName) |
| throws IOException, ClassNotFoundException { |
| SerializableCallable getRVVByteArray = new SerializableCallable("getRVVByteArray") { |
| |
| @Override |
| public Object call() throws Exception { |
| Cache cache = getCache(); |
| LocalRegion region = (LocalRegion) cache.getRegion(regionName); |
| RegionVersionVector rvv = region.getVersionVector(); |
| rvv = rvv.getCloneForTransmission(); |
| HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); |
| |
| // Using gemfire serialization because |
| // RegionVersionVector is not java serializable |
| DataSerializer.writeObject(rvv, hdos); |
| return hdos.toByteArray(); |
| } |
| }; |
| byte[] result = (byte[]) vm.invoke(getRVVByteArray); |
| return result; |
| } |
| |
| protected RegionVersionVector getRVV(VM vm) throws IOException, ClassNotFoundException { |
| byte[] result = getRVVByteArray(vm, REGION_NAME); |
| ByteArrayInputStream bais = new ByteArrayInputStream(result); |
| return DataSerializer.readObject(new DataInputStream(bais)); |
| } |
| |
| protected RegionVersionVector getDiskRVV(VM vm) throws IOException, ClassNotFoundException { |
| SerializableCallable createData = new SerializableCallable("getRVV") { |
| |
| @Override |
| public Object call() throws Exception { |
| Cache cache = getCache(); |
| LocalRegion region = (LocalRegion) cache.getRegion(REGION_NAME); |
| RegionVersionVector rvv = region.getDiskRegion().getRegionVersionVector(); |
| rvv = rvv.getCloneForTransmission(); |
| HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); |
| |
| // Using gemfire serialization because |
| // RegionVersionVector is not java serializable |
| DataSerializer.writeObject(rvv, hdos); |
| return hdos.toByteArray(); |
| } |
| }; |
| byte[] result = (byte[]) vm.invoke(createData); |
| ByteArrayInputStream bais = new ByteArrayInputStream(result); |
| return DataSerializer.readObject(new DataInputStream(bais)); |
| } |
| |
| private void checkIfFullGII(VM vm, final String regionName, final byte[] remote_rvv_bytearray, |
| final boolean expectFullGII) { |
| SerializableRunnable checkIfFullGII = new SerializableRunnable("check if full gii") { |
| @Override |
| public void run() { |
| DistributedRegion rr = (DistributedRegion) getCache().getRegion(regionName); |
| ByteArrayInputStream bais = new ByteArrayInputStream(remote_rvv_bytearray); |
| RegionVersionVector remote_rvv = null; |
| try { |
| remote_rvv = DataSerializer.readObject(new DataInputStream(bais)); |
| } catch (IOException e) { |
| Assert.fail("Unexpected exception", e); |
| } catch (ClassNotFoundException e) { |
| Assert.fail("Unexpected exception", e); |
| } |
| RequestImageMessage rim = new RequestImageMessage(); |
| rim.setSender(R_ID); |
| boolean isFullGII = rim.goWithFullGII(rr, remote_rvv); |
| assertEquals(expectFullGII, isFullGII); |
| } |
| }; |
| vm.invoke(checkIfFullGII); |
| } |
| |
| private void doOneClear(final VM vm, final long regionVersionForThisOp) { |
| SerializableRunnable clearOp = oneClearOp(regionVersionForThisOp, getMemberID(vm)); |
| vm.invoke(clearOp); |
| } |
| |
| private SerializableRunnable oneClearOp(final long regionVersionForThisOp, |
| final DiskStoreID memberID) { |
| SerializableRunnable clearOp = new SerializableRunnable("clear now") { |
| @Override |
| public void run() { |
| DistributedRegion rr = (DistributedRegion) getCache().getRegion(REGION_NAME); |
| rr.clear(); |
| long region_version = getRegionVersionForMember(rr.getVersionVector(), memberID, false); |
| long region_gc_version = getRegionVersionForMember(rr.getVersionVector(), memberID, true); |
| assertEquals(regionVersionForThisOp, region_version); |
| assertEquals(region_version, region_gc_version); |
| } |
| }; |
| return clearOp; |
| } |
| |
| private SerializableRunnable onePutOp(final String key, final String value, |
| final long regionVersionForThisOp, final DiskStoreID memberID, final boolean syncInvocation) { |
| SerializableRunnable putOp = new SerializableRunnable("put " + key) { |
| @Override |
| public void run() { |
| LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME); |
| lr.put(key, value); |
| long region_version = getRegionVersionForMember(lr.getVersionVector(), memberID, false); |
| if (syncInvocation) { |
| assertEquals(regionVersionForThisOp, region_version); |
| } |
| } |
| }; |
| return putOp; |
| } |
| |
| private void doOnePut(final VM vm, final long regionVersionForThisOp, final String key) { |
| SerializableRunnable putOp = |
| onePutOp(key, generateValue(vm), regionVersionForThisOp, getMemberID(vm), true); |
| vm.invoke(putOp); |
| } |
| |
| private AsyncInvocation doOnePutAsync(final VM vm, final long regionVersionForThisOp, |
| final String key) { |
| SerializableRunnable putOp = |
| onePutOp(key, generateValue(vm), regionVersionForThisOp, getMemberID(vm), false); |
| AsyncInvocation async = vm.invokeAsync(putOp); |
| return async; |
| } |
| |
| private String generateValue(final VM vm) { |
| return "VALUE from vm" + vm.getId(); |
| } |
| |
| private SerializableRunnable oneDestroyOp(final String key, final String value, |
| final long regionVersionForThisOp, final DiskStoreID memberID, final boolean syncInvocation) { |
| SerializableRunnable destroyOp = new SerializableRunnable("destroy " + key) { |
| @Override |
| public void run() { |
| LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME); |
| lr.destroy(key); |
| long region_version = getRegionVersionForMember(lr.getVersionVector(), memberID, false); |
| if (syncInvocation) { |
| assertEquals(regionVersionForThisOp, region_version); |
| } |
| } |
| }; |
| return destroyOp; |
| } |
| |
| private void doOneDestroy(final VM vm, final long regionVersionForThisOp, final String key) { |
| SerializableRunnable destroyOp = |
| oneDestroyOp(key, generateValue(vm), regionVersionForThisOp, getMemberID(vm), true); |
| vm.invoke(destroyOp); |
| } |
| |
| private AsyncInvocation doOneDestroyAsync(final VM vm, final long regionVersionForThisOp, |
| final String key) { |
| SerializableRunnable destroyOp = |
| oneDestroyOp(key, generateValue(vm), regionVersionForThisOp, getMemberID(vm), false); |
| AsyncInvocation async = vm.invokeAsync(destroyOp); |
| return async; |
| } |
| |
| private long getRegionVersionForMember(RegionVersionVector rvv, DiskStoreID member, |
| boolean isRVVGC) { |
| long ret = 0; |
| if (isRVVGC) { |
| ret = rvv.getGCVersion(member); |
| } else { |
| ret = rvv.getVersionForMember(member); |
| } |
| return (ret == -1 ? 0 : ret); |
| } |
| |
| private void assertSameRVV(RegionVersionVector rvv1, RegionVersionVector rvv2) { |
| if (!rvv1.sameAs(rvv2)) { |
| fail("Expected " + rvv1 + " but was " + rvv2); |
| } |
| } |
| |
| protected void verifyTombstoneExist(VM vm, final String key, final boolean expectExist, |
| final boolean expectExpired) { |
| SerializableRunnable verify = new SerializableRunnable() { |
| private boolean doneVerify() { |
| Cache cache = getCache(); |
| LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME); |
| NonTXEntry entry = (NonTXEntry) lr.getEntry(key, true); |
| if (expectExist) { |
| assertTrue(entry != null && entry.getRegionEntry().isTombstone()); |
| } |
| |
| System.out.println("GGG:new timeout=" + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT); |
| if (entry == null || !entry.getRegionEntry().isTombstone()) { |
| return (false == expectExist); |
| } else { |
| long ts = entry.getRegionEntry().getVersionStamp().getVersionTimeStamp(); |
| if (expectExpired) { |
| return (ts + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT <= ((GemFireCacheImpl) cache) |
| .cacheTimeMillis()); // use MAX_WAIT as timeout |
| } else { |
| return (true == expectExist); |
| } |
| } |
| } |
| |
| @Override |
| public void run() { |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return doneVerify(); |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertTrue(doneVerify()); |
| } |
| }; |
| vm.invoke(verify); |
| } |
| |
| protected int getDeltaGIICount(VM vm) { |
| SerializableCallable getDelGIICount = new SerializableCallable("getDelGIICount") { |
| @Override |
| public Object call() throws Exception { |
| GemFireCacheImpl gfc = (GemFireCacheImpl) getCache(); |
| return gfc.getTombstoneService().getGCBlockCount(); |
| } |
| }; |
| int result = (Integer) vm.invoke(getDelGIICount); |
| return result; |
| } |
| |
| private void checkAsyncCall(AsyncInvocation async) { |
| try { |
| async.join(30000); |
| if (async.exceptionOccurred()) { |
| Assert.fail("Test failed", async.getException()); |
| } |
| } catch (InterruptedException e1) { |
| Assert.fail("Test failed", e1); |
| } |
| } |
| |
| public static void slowGII(final long[] versionsToBlock) { |
| DistributionMessageObserver.setInstance(new BlockMessageObserver(versionsToBlock)); |
| } |
| |
| public static void resetSlowGII() { |
| BlockMessageObserver observer = |
| (BlockMessageObserver) DistributionMessageObserver.setInstance(null); |
| if (observer != null) { |
| observer.cdl.countDown(); |
| } |
| } |
| |
| // private void slowGII(VM vm) { |
| // SerializableRunnable slowGII = new SerializableRunnable("Slow down GII") { |
| // @SuppressWarnings("synthetic-access") |
| // public void run() { |
| // slowGII(); |
| // } |
| // }; |
| // vm.invoke(slowGII); |
| // } |
| // |
| // private void resetSlowGII(VM vm) { |
| // SerializableRunnable resetSlowGII = new SerializableRunnable("Unset the slow GII") { |
| // public void run() { |
| // resetSlowGII(); |
| // } |
| // }; |
| // vm.invoke(resetSlowGII); |
| // } |
| |
| private static class BlockMessageObserver extends DistributionMessageObserver { |
| private long[] versionsToBlock; |
| |
| CountDownLatch cdl = new CountDownLatch(1); |
| |
| BlockMessageObserver(long[] versionsToBlock) { |
| this.versionsToBlock = versionsToBlock; |
| } |
| |
| @Override |
| public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) { |
| VersionTag tag = null; |
| if (message instanceof UpdateMessage) { |
| UpdateMessage um = (UpdateMessage) message; |
| tag = um.getVersionTag(); |
| } else if (message instanceof DestroyMessage) { |
| DestroyMessage um = (DestroyMessage) message; |
| tag = um.getVersionTag(); |
| } else { |
| return; |
| } |
| try { |
| boolean toBlock = false; |
| for (long blockversion : versionsToBlock) { |
| if (tag.getRegionVersion() == blockversion) { |
| toBlock = true; |
| break; |
| } |
| } |
| if (toBlock) { |
| cdl.await(); |
| } |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| private class Mycallback extends GIITestHook { |
| private Object lockObject = new Object(); |
| |
| public Mycallback(GIITestHookType type, String region_name) { |
| super(type, region_name); |
| } |
| |
| @Override |
| public void reset() { |
| synchronized (this.lockObject) { |
| this.lockObject.notify(); |
| } |
| } |
| |
| @Override |
| public void run() { |
| synchronized (this.lockObject) { |
| try { |
| isRunning = true; |
| this.lockObject.wait(); |
| } catch (InterruptedException e) { |
| } |
| } |
| } |
| } // Mycallback |
| } |