blob: aec149e4161185b7d70644f63d9223994ae1ec70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static java.lang.System.out;
import static org.apache.geode.internal.cache.DistributedCacheOperation.SLOW_DISTRIBUTION_MS;
import static org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType.DuringPackingImage;
import static org.apache.geode.internal.cache.InitialImageOperation.getGIITestHookForCheckingPurpose;
import static org.apache.geode.internal.cache.InitialImageOperation.resetGIITestHook;
import static org.apache.geode.internal.cache.InitialImageOperation.setGIITestHook;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
import static org.apache.geode.test.dunit.Wait.pause;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
import org.apache.geode.DataSerializer;
import org.apache.geode.GemFireIOException;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.DestroyOperation.DestroyMessage;
import org.apache.geode.internal.cache.DistributedTombstoneOperation.TombstoneMessage;
import org.apache.geode.internal.cache.InitialImageOperation.GIITestHook;
import org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType;
import org.apache.geode.internal.cache.InitialImageOperation.RequestImageMessage;
import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
import org.apache.geode.internal.cache.entries.DiskEntry;
import org.apache.geode.internal.cache.persistence.DiskStoreID;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
VM P; // GII provider
VM R; // GII requester
InternalDistributedMember R_ID; // DistributedMember ID of R
private static final long MAX_WAIT = 30000;
// protected static String REGION_NAME = GIIDeltaDUnitTest.class.getSimpleName()+"_Region";
protected static String REGION_NAME = "_Region";
final String expectedExceptions = GemFireIOException.class.getName();
protected IgnoredException expectedEx;
static Object giiSyncObject = new Object();
public GIIDeltaDUnitTest() {
super();
}
@Override
public final void postSetUp() throws Exception {
Invoke.invokeInEveryVM(GIIDeltaDUnitTest.class, "setRegionName",
new Object[] {getUniqueName()});
setRegionName(getUniqueName());
}
public static void setRegionName(String testName) {
REGION_NAME = testName + "_Region";
}
@Override
public final void preTearDownCacheTestCase() throws Exception {
P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII());
R.invoke(() -> GIIDeltaDUnitTest.resetSlowGII());
P.invoke(() -> InitialImageOperation.resetAllGIITestHooks());
R.invoke(() -> InitialImageOperation.resetAllGIITestHooks());
changeUnfinishedOperationLimit(R, 10000);
changeForceFullGII(R, false, false);
changeForceFullGII(P, false, false);
P = null;
R = null;
}
@Override
public final void postTearDownCacheTestCase() throws Exception {
// clean up the test hook, which can be moved to CacheTestCase
DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
if (expectedEx != null) {
expectedEx.remove();
}
}
// private VersionTag getVersionTag(VM vm, final String key) {
// SerializableCallable getVersionTag = new SerializableCallable("verify recovered entry") {
// public Object call() {
// VersionTag tag = CCRegion.getVersionTag(key);
// return tag;
//
// }
// };
// return (VersionTag)vm.invoke(getVersionTag);
// }
public InternalDistributedMember getDistributedMemberID(VM vm) {
SerializableCallable getID = new SerializableCallable("get member id") {
@Override
public Object call() {
return getCache().getDistributedSystem().getDistributedMember();
}
};
InternalDistributedMember id = (InternalDistributedMember) vm.invoke(getID);
return id;
}
protected void prepareForEachTest() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
createDistributedRegion(vm0);
createDistributedRegion(vm1);
assignVMsToPandR(vm0, vm1);
// from now on, use P and R as
// vmhttps://wiki.gemstone.com/display/gfepersistence/DeltaGII+Spec+for+8.0
expectedEx = IgnoredException.addIgnoredException(expectedExceptions);
}
// these steps are shared by all test cases
private void prepareCommonTestData(int p_version) {
// operation P1: region.put("key") at P. After the operation, region version for P becomes 1
R_ID = getDistributedMemberID(R);
assertDeltaGIICountBeZero(P);
assertDeltaGIICountBeZero(R);
doOnePut(P, 1, "key1");
doOnePut(R, 1, "key2");
doOnePut(R, 2, "key5");
createConflictOperationsP2R3();
for (int i = 3; i <= p_version; i++) {
switch (i) {
case 3:
doOneDestroy(P, 3, "key1");
break;
case 4:
doOneDestroy(P, 4, "key2");
break;
case 5:
doOnePut(P, 5, "key1");
break;
case 6:
doOnePut(P, 6, "key3");
default:
// donothing
}
}
// at this moment, cache has key1, key3, key5
}
private void createConflictOperationsP2R3() {
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long blocklist[] = {2, 3};
P.invoke(() -> GIIDeltaDUnitTest.slowGII(blocklist));
R.invoke(() -> GIIDeltaDUnitTest.slowGII(blocklist));
AsyncInvocation async1 = doOnePutAsync(P, 2, "key1");
AsyncInvocation async2 = doOnePutAsync(R, 3, "key1");
// wait for the local puts are done at P & R before distribution
waitForToVerifyRVV(P, memberP, 2, null, 0); // P's rvv=p2, gc=0
waitForToVerifyRVV(P, memberR, 2, null, 0); // P's rvv=r2, gc=0
waitForToVerifyRVV(R, memberP, 1, null, 0); // P's rvv=p2, gc=0
waitForToVerifyRVV(R, memberR, 3, null, 0); // P's rvv=r2, gc=0
// new Object[] { memberP, 2, 3, 0, 0, false }
P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII());
R.invoke(() -> GIIDeltaDUnitTest.resetSlowGII());
// should wait for async calls to finish before going on
checkAsyncCall(async1);
checkAsyncCall(async2);
// verify RVVs
waitForToVerifyRVV(P, memberP, 2, null, 0); // P's rvv=p2, gc=0
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
waitForToVerifyRVV(R, memberP, 2, null, 0); // P's rvv=p2, gc=0
waitForToVerifyRVV(R, memberR, 3, null, 0); // P's rvv=r3, gc=0
// verify P won the conflict check
waitToVerifyKey(P, "key1", generateValue(P));
waitToVerifyKey(R, "key1", generateValue(P));
}
private void createUnfinishedOperationsR4R5() {
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
// let r6 to succeed, r4,r5 to be blocked
R.invoke(() -> GIIDeltaDUnitTest.slowGII(exceptionlist));
AsyncInvocation async1 = doOnePutAsync(R, 4, "key4");
waitForToVerifyRVV(R, memberR, 4, null, 0); // P's rvv=r4, gc=0
AsyncInvocation async2 = doOneDestroyAsync(R, 5, "key5");
waitForToVerifyRVV(R, memberR, 5, null, 0); // P's rvv=r5, gc=0
doOnePut(R, 6, "key1"); // r6 will pass
waitForToVerifyRVV(R, memberR, 6, null, 0); // P's rvv=r6, gc=0
// P should have exception list R4,R5 now
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
}
/**
* vm0 and vm1 are peers, each holds a DR. Each does a few operations to make RVV=P7,R6,
* RVVGC=P4,R0 for both members. vm1 becomes offline then restarts. The deltaGII should only
* exchange RVV. No need to send data from vm0 to vm1.
*/
@Test
public void testDeltaGIIWithSameRVV() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
// force tombstone GC to let RVVGC to become P4:R0
forceGC(P, 2);
waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
// let r4,r5,r6 to succeed
doOnePut(R, 4, "key4");
doOneDestroy(R, 5, "key5");
doOnePut(R, 6, "key1");
// let p7 to succeed
doOnePut(P, 7, "key1");
waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(R, memberP, 7, null, 4); // P's rvv=p7, gc=4
waitForToVerifyRVV(R, memberR, 6, null, 0); // P's rvv=r6, gc=0
// now the rvv and rvvgc at P and R should be the same
// save R's rvv in byte array, check if it will be fullGII
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
// shutdown R and restart
closeCache(R);
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
createDistributedRegion(R);
waitForToVerifyRVV(R, memberP, 7, null, 4); // P's rvv=p7, gc=4
waitForToVerifyRVV(R, memberR, 6, null, 0); // P's rvv=r6, gc=0
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same
// If fullGII, the key size in gii chunk is 4, i.e. key1,key3,key4,key5(tombstone). key2 is
// GCed.
// If delta GII, the key size should be 0
verifyDeltaSizeFromStats(R, 0, 1);
}
/**
* vm0 and vm1 are peers, each holds a DR. create some exception list. Before GII, P's RVV is
* P7,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0 vm1 becomes offline then restarts.
* https://wiki.gemstone.com/display/gfepersistence/DeltaGII+Spec+for+8.0 The deltaGII should send
* delta to R, revoke unfinished opeation R4,R5
*/
@Test
public void testDeltaGIIWithExceptionList() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
VersionTag expect_tag = getVersionTag(R, "key5");
// force tombstone GC to let RVVGC to become P4:R0
forceGC(P, 2);
waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
createUnfinishedOperationsR4R5();
// now P's cache still only has key1, key3, key5
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// p7 only apply at P
doOnePut(P, 7, "key1");
// restart and gii
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
createDistributedRegion(R);
waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4
waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv);
// If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed.
// If delta GII, the key size should be 2, i.e. P7(key1) and (key5(T) which is unfinished
// operation)
verifyDeltaSizeFromStats(R, 2, 1);
// verify unfinished op for key5 is revoked
waitToVerifyKey(R, "key5", generateValue(R));
VersionTag tag = getVersionTag(R, "key5");
assertTrue(expect_tag.equals(tag));
// restart P, since R has received exceptionlist R4,R5 from P
closeCache(P);
createDistributedRegion(P);
waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0
// If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key4 is removed as
// unfinished op
// If deltaGII, the key size should be 0
verifyDeltaSizeFromStats(P, 0, 1);
}
/**
* vm0 and vm1 are peers, each holds a DR. create some exception list. Before GII, P's RVV is
* P6,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0 vm1 becomes offline then restarts. The
* deltaGII should send delta which only contains unfinished operation R4,R5
*/
@Test
public void testDeltaGIIWithOnlyUnfinishedOp() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
VersionTag expect_tag = getVersionTag(R, "key5");
// force tombstone GC to let RVVGC to become P4:R0
forceGC(P, 2);
waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
createUnfinishedOperationsR4R5();
// now P's cache still only has key1, key3, key5
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// restart and gii
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
createDistributedRegion(R);
waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(R, memberP, 6, null, 4); // R's rvv=p6, gc=4
waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv);
// If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed.
// If delta GII, the key size should be 1 (key5(T) which is unfinished operation)
verifyDeltaSizeFromStats(R, 1, 1);
// verify unfinished op for key5 is revoked
waitToVerifyKey(R, "key5", generateValue(R));
VersionTag tag = getVersionTag(R, "key5");
assertTrue(expect_tag.equals(tag));
// restart P, since R has received exceptionlist R4,R5 from P
closeCache(P);
createDistributedRegion(P);
waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0
// If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key4 is removed as
// unfinished op
// If deltaGII, the key size should be 0
verifyDeltaSizeFromStats(P, 0, 1);
// restart R, to make sure the unfinished op is handled correctly
forceGC(R, 1); // for bug 47616
waitForToVerifyRVV(P, memberR, 6, null, 5); // P's rvv=R6, gc=5
waitForToVerifyRVV(R, memberR, 6, null, 5); // P's rvv=R6, gc=5
closeCache(R);
createDistributedRegion(R);
// If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key4 is removed as
// unfinished op
// If deltaGII, the key size should be 0
verifyDeltaSizeFromStats(R, 0, 1);
// verify unfinished op for key5 is revoked
waitToVerifyKey(R, "key5", generateValue(R));
tag = getVersionTag(R, "key5");
assertTrue(expect_tag.equals(tag));
}
/**
* This is to test a race condition for bug#47616 vm0 and vm1 are peers, each holds a DR. create
* some exception list. Before GII, P's RVV is P6,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0
* vm1 becomes offline then restarts. The deltaGII should send delta which only contains
* unfinished opeation R4,R5
*/
@Test
public void testDeltaGIIWithOnlyUnfinishedOp_GCAtR() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
VersionTag expect_tag = getVersionTag(R, "key5");
// force tombstone GC to let RVVGC to become P4:R0
forceGC(P, 2);
waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
createUnfinishedOperationsR4R5();
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0
// 2
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterRequestRVV = new Mycallback(GIITestHookType.AfterRequestRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterRequestRVV);
}
});
// now P's cache still only has key1, key3, key5
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// restart and gii
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
AsyncInvocation async3 = createDistributedRegionAsync(R);
// 2
waitForCallbackStarted(R, GIITestHookType.AfterRequestRVV);
forceGC(R, 1);
R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterRequestRVV, true));
async3.join(MAX_WAIT);
// verify unfinished op for key5 is revoked
waitToVerifyKey(R, "key5", generateValue(R));
VersionTag tag = getVersionTag(R, "key5");
assertTrue(expect_tag.equals(tag));
verifyTombstoneExist(R, "key5", false, false);
// If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed.
// If delta GII, the key size should be 1 (key5(T) which is unfinished operation)
verifyDeltaSizeFromStats(R, 3, 0);
}
/**
* vm0 and vm1 are peers, each holds a DR. create some exception list. Then shutdown R. Do
* tombstone GC at P only. Before GII, P's RVV is P7,R6(3-6), RVVGC is P4,R0; R's RVV is P6,R6,
* RVVGC are both P0,R0 vm1 becomes offline then restarts.
* https://wiki.gemstone.com/display/gfepersistence/DeltaGII+Spec+for+8.0 The deltaGII should send
* delta to R, revoke unfinished opeation R4,R5
*/
@Test
public void testDeltaGIIWithDifferentRVVGC() throws Throwable {
final String testcase = "testDeltaGIIWithExceptionList";
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0
VersionTag expect_tag = getVersionTag(R, "key5");
createUnfinishedOperationsR4R5();
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// force tombstone GC to let RVVGC to become P4:R0
changeTombstoneTimout(R, MAX_WAIT);
changeTombstoneTimout(P, MAX_WAIT);
Wait.pause((int) MAX_WAIT);
forceGC(P, 2);
waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4
// p7 only apply at P
doOnePut(P, 7, "key1");
waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4
verifyTombstoneExist(P, "key2", false, false);
// restart R and gii
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
createDistributedRegion(R);
waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4
waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv);
// If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed.
// If delta GII, the key size should be 2, i.e. P7 (key1) and (key5(T) which is unfinished
// operation)
verifyDeltaSizeFromStats(R, 2, 1);
// verify unfinished op for key5 is revoked
waitToVerifyKey(R, "key5", generateValue(R));
VersionTag tag = getVersionTag(R, "key5");
assertTrue(expect_tag.equals(tag));
verifyTombstoneExist(R, "key2", false, false);
verifyTombstoneExist(P, "key2", false, false);
}
/**
* vm0 and vm1 are peers, each holds a DR. Let provider to have higher RVVGC than requester's RVV
* It should trigger fullGII
*
* This test also verify tombstoneGC happened in middle of GII, but BEFORE GII thread got GIILock.
* i.e. before GII, P's RVVGC=P0,R0, upon received RequestImageMessage, it becomes P4,R0 it should
* cause the fullGII.
*/
@Test
public void testFullGIITriggeredByHigherRVVGC() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(3);
waitForToVerifyRVV(P, memberP, 3, null, 0); // P's rvv=p3, gc=0
createUnfinishedOperationsR4R5();
waitForToVerifyRVV(R, memberP, 3, null, 0); // R's rvv=p3, gc=0
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// p4-7 only apply at P
doOneDestroy(P, 4, "key2");
doOnePut(P, 5, "key1");
doOnePut(P, 6, "key3");
doOnePut(P, 7, "key1");
// add test hook
// 7
P.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterReceivedRequestImage =
new Mycallback(GIITestHookType.AfterReceivedRequestImage, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterReceivedRequestImage);
}
});
// restart R and gii, it will be blocked at test hook
AsyncInvocation async3 = createDistributedRegionAsync(R);
// 7
waitForCallbackStarted(P, GIITestHookType.AfterReceivedRequestImage);
// force tombstone GC to let RVVGC to become P4:R0, but R already sent its old RVV/RVVGC over
// this tombstone GC happens BEFORE GII thread got the GIILock, so it will be fullGCC
forceGC(P, 2);
waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true);
// let GII continue
P.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterReceivedRequestImage,
true));
async3.join(MAX_WAIT);
waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4
waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv);
// In fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed.
verifyDeltaSizeFromStats(R, 3, 0);
}
/**
* vm0(P), vm1(R), vm2(T) are peers, each holds a DR. shutdown vm1(R), vm2(T) Let provider to have
* higher RVVGC than requester's RVV when vm1 and vm2 are offline Restart R, It should trigger
* fullGII If R did not save the rvvgc, restarted R will have a way smaller rvvgc (maybe the same
* as T's) Let T requests GII from R, it wll become deltaGII, which is wrong.
*/
@Test
public void testSavingRVVGC() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
Host host = Host.getHost(0);
VM T = host.getVM(2);
createDistributedRegion(T);
final DiskStoreID memberT = getMemberID(T);
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(3);
waitForToVerifyRVV(P, memberP, 3, null, 0); // P's rvv=p3, gc=0
waitForToVerifyRVV(R, memberP, 3, null, 0); // R's rvv=p3, gc=0
waitForToVerifyRVV(T, memberP, 3, null, 0); // T's rvv=p3, gc=0
closeCache(T);
closeCache(R);
// p4-7 only apply at P
doOneDestroy(P, 4, "key2");
doOnePut(P, 5, "key1");
doOnePut(P, 6, "key3");
doOnePut(P, 7, "key1");
// force tombstone GC to let RVVGC to become P4:R0
forceGC(P, 2);
// restart R and gii, it will be blocked at test hook
createDistributedRegion(R);
waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p7, gc=4
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4
waitForToVerifyRVV(R, memberR, 3, null, 0); // R's rvv=r3, gc=0
// RegionVersionVector p_rvv = getRVV(P);
// RegionVersionVector r_rvv = getRVV(R);
// assertSameRVV(p_rvv, r_rvv);
// In fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed.
verifyDeltaSizeFromStats(R, 3, 0);
verifyTombstoneExist(P, "key2", false, false);
verifyTombstoneExist(R, "key2", false, false);
closeCache(P);
closeCache(R);
createDistributedRegion(R);
waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4
waitForToVerifyRVV(R, memberR, 3, null, 0); // R's rvv=r3, gc=0
verifyTombstoneExist(R, "key2", false, false);
createDistributedRegion(T);
waitForToVerifyRVV(T, memberP, 7, null, 4); // R's rvv=p7, gc=4
waitForToVerifyRVV(T, memberR, 3, null, 0); // R's rvv=r3, gc=0
// In fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed.
verifyDeltaSizeFromStats(T, 3, 0);
verifyTombstoneExist(T, "key2", false, false);
}
/**
* vm0 and vm1 are peers, each holds a DR. create some exception list. Not to do any tombstone GC
* Before GII, P's RVV is P7,R6(3-6), R's RVV is P6,R6, RVVGC are both P0,R0 vm1 becomes offline
* then restarts. https://wiki.gemstone.com/display/gfepersistence/DeltaGII+Spec+for+8.0 The
* deltaGII should send delta to R, revoke unfinished opeation R4,R5
*/
@Test
public void testDeltaGIIWithoutRVVGC() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
VersionTag expect_tag = getVersionTag(R, "key5");
createUnfinishedOperationsR4R5();
// now P's cache still only has key1, key3
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// p7 only apply at P
doOnePut(P, 7, "key1");
// restart R and gii
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
createDistributedRegion(R);
waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(R, memberP, 7, null, 0); // R's rvv=p7, gc=0
waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv);
// If fullGII, the key size in gii chunk is 4.
// tombstone counts in deltaSize, so without rvvgc, it's 4: key1, key2(tombstone), key3, key5
// In delta GII, it should be 1, i.e. P7 (key1) and (key5(T) which is unfinished operation)
verifyDeltaSizeFromStats(R, 2, 1);
// verify unfinished op for key5 is revoked
waitToVerifyKey(R, "key5", generateValue(R));
VersionTag tag = getVersionTag(R, "key5");
assertTrue(expect_tag.equals(tag));
}
/**
* vm0 and vm1 are peers, each holds a DR. unifinished P8: destroy(key1), finished P9: put(key3).
* Shutdown R, then GC tombstones at P. P's RVV=P9,R6(3-6), RVVGC=P8,R0, R's RVV=P9(7-9),R6, RVV
* It should trigger fullGII
*/
@Test
public void testFullGIINotDorminatedByProviderRVVGC() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(3);
createUnfinishedOperationsR4R5();
waitForToVerifyRVV(P, memberP, 3, null, 0); // P's rvv=p3, gc=0
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
waitForToVerifyRVV(R, memberP, 3, null, 0); // R's rvv=p3, gc=0
waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0
// p4-7 only apply at P
doOneDestroy(P, 4, "key2");
doOnePut(P, 5, "key1");
doOnePut(P, 6, "key3");
doOnePut(P, 7, "key1");
final long[] exceptionlist2 = {8};
// let p9 to succeed, p8 to be blocked
P.invoke(() -> GIIDeltaDUnitTest.slowGII(exceptionlist2));
AsyncInvocation async1 = doOneDestroyAsync(P, 8, "key1");
waitForToVerifyRVV(P, memberP, 8, null, 0);
doOnePut(P, 9, "key3");
waitForToVerifyRVV(P, memberP, 9, null, 0);
waitForToVerifyRVV(R, memberP, 9, exceptionlist2, 0);
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
forceGC(P, 3);
// now P's RVV=P9,R6(3-6), RVVGC=P8,R0, R's RVV=P9(7-9), R6
waitForToVerifyRVV(P, memberP, 9, null, 8); // P's rvv=p9, gc=8
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
P.invoke(() -> GIIDeltaDUnitTest.resetSlowGII());
// restart and gii, R's rvv should be the same as P's
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true);
createDistributedRegion(R);
waitForToVerifyRVV(R, memberP, 9, null, 8); // R's rvv=p9, gc=8
waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same
// In fullGII, the key size in gii chunk is 2. They are: key3, key5
verifyDeltaSizeFromStats(R, 2, 0);
}
/**
* Let R4, R5 unfinish, but R5 is the last operation from R. So P's RVV is still P:x,R3, without
* exception list. But actually R4, R5 are unfinished ops by all means.
*/
@Test
public void testUnfinishedOpsWithoutExceptionList() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
VersionTag expect_tag = getVersionTag(R, "key5");
final long[] exceptionlist = {4, 5};
R.invoke(() -> GIIDeltaDUnitTest.slowGII(exceptionlist));
AsyncInvocation async1 = doOnePutAsync(R, 4, "key4");
waitForToVerifyRVV(R, memberR, 4, null, 0); // P's rvv=r4, gc=0
AsyncInvocation async2 = doOneDestroyAsync(R, 5, "key5");
waitForToVerifyRVV(R, memberR, 5, null, 0); // P's rvv=r5, gc=0
// P should have unfinished ops R4,R5, but they did not show up in exception list
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
// let p7 to succeed
doOnePut(P, 7, "key1");
waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
waitForToVerifyRVV(R, memberP, 7, null, 0); // R's rvv=p7, gc=0
waitForToVerifyRVV(R, memberR, 5, null, 0); // R's rvv=r3, gc=0
// now P's rvv=P7,R3, R's RVV=P7,R5
// shutdown R and restart
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
createDistributedRegion(R);
waitForToVerifyRVV(R, memberP, 7, null, 0); // P's rvv=p7, gc=0
waitForToVerifyRVV(R, memberR, 3, exceptionlist, 0); // P's rvv=r3, gc=0
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same
// full GII chunk has 4 keys: key1,2(tombstone),3,5
// delta GII chunk has 1 key, i.e. (key5(T) which is unfinished operation)
verifyDeltaSizeFromStats(R, 1, 1);
// verify unfinished op for key5 is revoked
waitToVerifyKey(R, "key5", generateValue(R));
VersionTag tag = getVersionTag(R, "key5");
assertTrue(expect_tag.equals(tag));
// shutdown R again and restart, to verify localVersion=5 will be saved and recovered
closeCache(R);
createDistributedRegion(R);
// P will receive R6 and have exception R6(3-6)
doOnePut(R, 6, "key1"); // r6 will pass
waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
}
/**
* P, R has key1 unchanged, make certain unfinished operations on R to trigger full gii.
*/
@Test
public void testRecoveredFromDiskBitAfterFullGII() {
prepareForEachTest();
getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
verifyRecoveredFromDiskBitAfterGII(memberR, true);
}
/**
* P, R has key1 unchanged, make one unfinished operation on R to trigger delta gii.
*/
@Test
public void testRecoveredFromDiskBitAfterDeltaGII() {
prepareForEachTest();
getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
verifyRecoveredFromDiskBitAfterGII(memberR, false);
}
private void verifyRecoveredFromDiskBitAfterGII(final DiskStoreID memberR, boolean isFullGII) {
final long[] exceptionlist = {3, 4};
doOnePut(R, 1, "key1");
doOnePut(R, 2, "key2");
R.invoke(() -> GIIDeltaDUnitTest.slowGII(exceptionlist));
doOnePutAsync(R, 3, "key2");
waitForToVerifyRVV(R, memberR, 3, null, 0); // R's rvv=r3, gc=0
if (isFullGII) {
doOnePutAsync(R, 4, "key2");
waitForToVerifyRVV(R, memberR, 4, null, 0); // R's rvv=r4, gc=0
}
closeCache(R);
doOnePut(P, 1, "key2");
// restart R
createDistributedRegion(R);
R.invoke(() -> verifyRecoveredFromDiskBit("key2", isFullGII, true));
R.invoke(() -> verifyRecoveredFromDiskBit("key1", isFullGII, false));
}
private void verifyRecoveredFromDiskBit(String key, boolean isFullGII,
boolean versionTagBeenChanged) {
LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME);
DiskEntry re = (DiskEntry) lr.getRegionEntry(key);
DiskId id = re.getDiskId();
byte usebits = id.getUserBits();
if (!versionTagBeenChanged && !isFullGII) {
// in delta gii, versionTag not changed entry should kept the recoveredFromDisk bit
assertTrue(EntryBits.isRecoveredFromDisk(usebits));
} else {
assertFalse(EntryBits.isRecoveredFromDisk(usebits));
}
}
/**
* P1, P2, P3 R does GII but wait at BeforeSavedReceivedRVV, so R's RVV=P3R0 P4, P5 R goes on to
* save received RVV. R's new RVV=P5(3-6)R0
*
* When image (which contains P4, P5) arrives, it should fill the special exception
*/
@Test
public void testFillSpecialException() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
doOnePut(P, 1, "key1");
doOnePut(P, 2, "key2");
doOneDestroy(P, 3, "key1");
changeForceFullGII(R, false, true);
// 4
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myBeforeSavedReceivedRVV =
new Mycallback(GIITestHookType.BeforeSavedReceivedRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myBeforeSavedReceivedRVV);
}
});
// 5
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterSavedReceivedRVV =
new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV);
}
});
closeCache(R);
changeForceFullGII(R, true, false);
AsyncInvocation async3 = createDistributedRegionAsync(R);
waitForCallbackStarted(R, GIITestHookType.BeforeSavedReceivedRVV);
doOneDestroy(P, 4, "key2");
doOnePut(P, 5, "key1");
R.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.BeforeSavedReceivedRVV, true));
waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV);
R.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true));
async3.join(MAX_WAIT);
waitForToVerifyRVV(R, memberP, 5, null, 0); // P's rvv=r5, gc=0
changeForceFullGII(R, true, true);
changeForceFullGII(P, false, true);
verifyDeltaSizeFromStats(R, 2, 0);
}
/**
* P1, P2, P3 R does GII but wait at BeforeSavedReceivedRVV, so R's RVV=P3R0 P4, P5 R goes on to
* save received RVV. R's new RVV=P5(3-6)R0
*
* When image (which contains P4, P5) arrives, it should fill the special exception
*
* This test will let the 2 operations happen before RIM, so the rvv will match between R and P
* and no gii will happen. In this way, the chunk will not contain the missing entries.
*/
@Test
public void testFillSpecialException2() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
doOnePut(P, 1, "key1");
doOnePut(P, 2, "key2");
doOneDestroy(P, 3, "key1");
changeForceFullGII(R, false, true);
// 3
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterCalculatedUnfinishedOps =
new Mycallback(GIITestHookType.AfterCalculatedUnfinishedOps, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterCalculatedUnfinishedOps);
}
});
// 5
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterSavedReceivedRVV =
new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV);
}
});
closeCache(R);
AsyncInvocation async3 = createDistributedRegionAsync(R);
waitForCallbackStarted(R, GIITestHookType.AfterCalculatedUnfinishedOps);
doOneDestroy(P, 4, "key2");
doOnePut(P, 5, "key1");
R.invoke(() -> InitialImageOperation
.resetGIITestHook(GIITestHookType.AfterCalculatedUnfinishedOps, true));
waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV);
R.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true));
async3.join(MAX_WAIT);
waitForToVerifyRVV(R, memberP, 5, null, 0); // P's rvv=r5, gc=0
verifyDeltaSizeFromStats(R, 2, 1);
changeForceFullGII(R, false, true);
changeForceFullGII(P, false, true);
}
/*
* 1. after R5 becomes R4, regenrate another R5 2. verify stattic callback at provider 3. gii
* packing should wait for tombstone GC 4. gii packing should wait for other on-fly operations
*/
@Test
public void testHooks() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
VersionTag expect_tag = getVersionTag(R, "key5");
final long[] exceptionlist = {4, 5};
R.invoke(() -> GIIDeltaDUnitTest.slowGII(exceptionlist));
AsyncInvocation async1 = doOnePutAsync(R, 4, "key4");
waitForToVerifyRVV(R, memberR, 4, null, 0); // P's rvv=r4, gc=0
AsyncInvocation async2 = doOneDestroyAsync(R, 5, "key5");
waitForToVerifyRVV(R, memberR, 5, null, 0); // P's rvv=r5, gc=0
// P should have unfinished ops R4,R5, but they did not show up in exception list
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
// define test hooks
// 1
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myBeforeRequestRVV =
new Mycallback(GIITestHookType.BeforeRequestRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myBeforeRequestRVV);
}
});
// 2
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterRequestRVV = new Mycallback(GIITestHookType.AfterRequestRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterRequestRVV);
}
});
// 3
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterCalculatedUnfinishedOps =
new Mycallback(GIITestHookType.AfterCalculatedUnfinishedOps, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterCalculatedUnfinishedOps);
}
});
// 4
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myBeforeSavedReceivedRVV =
new Mycallback(GIITestHookType.BeforeSavedReceivedRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myBeforeSavedReceivedRVV);
}
});
// 5
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterSavedReceivedRVV =
new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV);
}
});
// 6
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterSentRequestImage =
new Mycallback(GIITestHookType.AfterSentRequestImage, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterSentRequestImage);
}
});
// 7
P.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterReceivedRequestImage =
new Mycallback(GIITestHookType.AfterReceivedRequestImage, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterReceivedRequestImage);
}
});
// 8
P.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myDuringPackingImage =
new Mycallback(GIITestHookType.DuringPackingImage, REGION_NAME);
InitialImageOperation.setGIITestHook(myDuringPackingImage);
}
});
// 9
P.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterSentImageReply =
new Mycallback(GIITestHookType.AfterSentImageReply, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterSentImageReply);
}
});
// 10
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterReceivedImageReply =
new Mycallback(GIITestHookType.AfterReceivedImageReply, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterReceivedImageReply);
}
});
// 11
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myDuringApplyDelta =
new Mycallback(GIITestHookType.DuringApplyDelta, REGION_NAME);
InitialImageOperation.setGIITestHook(myDuringApplyDelta);
}
});
// 12
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myBeforeCleanExpiredTombstones =
new Mycallback(GIITestHookType.BeforeCleanExpiredTombstones, REGION_NAME);
InitialImageOperation.setGIITestHook(myBeforeCleanExpiredTombstones);
}
});
// 13
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterSavedRVVEnd =
new Mycallback(GIITestHookType.AfterSavedRVVEnd, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterSavedRVVEnd);
}
});
// shutdown R and restart
waitForToVerifyRVV(R, memberP, 6, null, 0); // R's rvv=p6, gc=0
waitForToVerifyRVV(R, memberR, 5, null, 0); // R's rvv=r3, gc=0
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// let p7 to succeed
doOnePut(P, 7, "key1");
waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
// now P's rvv=P7,R3, R's RVV=P6,R5
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
AsyncInvocation async3 = createDistributedRegionAsync(R);
// 1
waitForCallbackStarted(R, GIITestHookType.BeforeRequestRVV);
R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.BeforeRequestRVV, true));
// 2
waitForCallbackStarted(R, GIITestHookType.AfterRequestRVV);
R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterRequestRVV, true));
// 3
waitForCallbackStarted(R, GIITestHookType.AfterCalculatedUnfinishedOps);
R.invoke(() -> InitialImageOperation
.resetGIITestHook(GIITestHookType.AfterCalculatedUnfinishedOps, true));
// 4
waitForCallbackStarted(R, GIITestHookType.BeforeSavedReceivedRVV);
R.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.BeforeSavedReceivedRVV, true));
// 5
waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV);
R.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true));
// 6
waitForCallbackStarted(R, GIITestHookType.AfterSentRequestImage);
R.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSentRequestImage, true));
// 7
waitForCallbackStarted(P, GIITestHookType.AfterReceivedRequestImage);
P.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterReceivedRequestImage,
true));
// 8
waitForCallbackStarted(P, GIITestHookType.DuringPackingImage);
P.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.DuringPackingImage, true));
// 9
waitForCallbackStarted(P, GIITestHookType.AfterSentImageReply);
P.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSentImageReply, true));
// 10
waitForCallbackStarted(R, GIITestHookType.AfterReceivedImageReply);
R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterReceivedImageReply,
true));
// 11
waitForCallbackStarted(R, GIITestHookType.DuringApplyDelta);
R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.DuringApplyDelta, true));
// 12
waitForCallbackStarted(R, GIITestHookType.BeforeCleanExpiredTombstones);
R.invoke(() -> InitialImageOperation
.resetGIITestHook(GIITestHookType.BeforeCleanExpiredTombstones, true));
// 13
waitForCallbackStarted(R, GIITestHookType.AfterSavedRVVEnd);
R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedRVVEnd, true));
async3.join(MAX_WAIT);
waitForToVerifyRVV(R, memberP, 7, null, 0); // P's rvv=p7, gc=0
waitForToVerifyRVV(R, memberR, 3, exceptionlist, 0); // P's rvv=r3, gc=0
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same
// full gii chunk has 4 entries: key1,2(tombstone),3,5
// delta gii has 2 entry: p7 (key1) and (key5(T) which is unfinished operation)
verifyDeltaSizeFromStats(R, 2, 1);
// verify unfinished op for key5 is revoked
waitToVerifyKey(R, "key5", generateValue(R));
VersionTag tag = getVersionTag(R, "key5");
assertTrue(expect_tag.equals(tag));
// P will receive R6 and have exception R6(3-6)
doOnePut(R, 6, "key1"); // r6 will pass
waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
P.invoke(() -> InitialImageOperation.resetAllGIITestHooks());
}
/**
* vm0 and vm1 are peers, each holds a DR. create some exception list. Then shutdown R. Before
* GII, P's RVV is P7,R6(3-6), RVVGC is P0,R0; R's RVV is P3,R6, RVVGC is P0,R0 vm1 becomes
* offline then restarts. Use testHook to pause the GII, then do tombstone GC at P only. The
* deltaGII should send correct tombstonedelta to R, revoke unfinished opeation R4,R5
*
* There's member T doing GII from P at the same time.
*
* In this test, GII thread will get the GIILock before tombstone GC, so tombstone GC should wait
* for all GIIs to finish
*/
@Test
public void testTombstoneGCInMiddleOfGII() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
Host host = getHost(0);
VM T = host.getVM(2);
createDistributedRegion(T);
final DiskStoreID memberT = getMemberID(T);
closeCache(T);
assertEquals(0, SLOW_DISTRIBUTION_MS);
prepareCommonTestData(3);
waitForToVerifyRVV(P, memberP, 3, null, 0); // P's rvv=p3, gc=0
VersionTag expect_tag = getVersionTag(R, "key5");
createUnfinishedOperationsR4R5();
waitForToVerifyRVV(R, memberP, 3, null, 0); // R's rvv=p3, gc=0
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// p4-7 only apply at P
doOneDestroy(P, 4, "key2");
doOnePut(P, 5, "key1");
doOnePut(P, 6, "key3");
doOnePut(P, 7, "key1");
// add test hook
// 8
P.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myDuringPackingImage =
new Mycallback(DuringPackingImage, REGION_NAME);
setGIITestHook(myDuringPackingImage);
}
});
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
// restart R and gii, it will be blocked at test hook
AsyncInvocation async3 = createDistributedRegionAsync(R);
// restart R and gii, it will be blocked at test hook
AsyncInvocation async4 = createDistributedRegionAsync(T);
// 8
waitForCallbackStarted(P, DuringPackingImage);
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
int count = getDeltaGIICount(P);
return (count == 2);
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
int count = getDeltaGIICount(P);
assertEquals(2, count);
// force tombstone GC to let RVVGC to become P4:R0, but R already sent its old RVV/RVVGC over
// this tombstone GC happens AFTER GII thread got the GIILock, so it will be ignored since GII
// is ongoing
changeTombstoneTimout(R, MAX_WAIT);
changeTombstoneTimout(P, MAX_WAIT);
changeTombstoneTimout(T, MAX_WAIT);
pause((int) MAX_WAIT);
forceGC(P, 2);
waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0
// let GII continue
P.invoke(
() -> resetGIITestHook(DuringPackingImage, false));
P.invoke(
() -> resetGIITestHook(DuringPackingImage, true));
WaitCriterion ev2 = new WaitCriterion() {
@Override
public boolean done() {
int count = getDeltaGIICount(P);
return (count == 0);
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev2);
count = getDeltaGIICount(P);
assertEquals(0, count);
verifyTombstoneExist(P, "key2", true, true); // expect key2 is still tombstone during and after
// GIIs
verifyTombstoneExist(R, "key2", true, true); // expect key2 is still tombstone during and after
// GIIs
verifyTombstoneExist(T, "key2", true, true); // expect key2 is still tombstone during and after
// GIIs
forceGC(P, 1); // trigger to GC the tombstones in expired queue
async3.join(MAX_WAIT);
async4.join(MAX_WAIT);
// after GII, tombstone GC happened
waitForToVerifyRVV(P, memberP, 7, null, 4); // P's rvv=p6, gc=4
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4
waitForToVerifyRVV(R, memberR, 6, exceptionlist, 0); // R's rvv=r6, gc=0
verifyTombstoneExist(P, "key2", false, true); // expect tombstone key2 is GCed at P
verifyTombstoneExist(R, "key2", false, true); // expect tombstone key2 is GCed at R
verifyTombstoneExist(T, "key2", false, true); // T got everything from P
// do a put from T
doOnePut(T, 1, "key1");
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
RegionVersionVector t_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv);
assertSameRVV(t_rvv, r_rvv);
// If fullGII, the key size in gii chunk is 4, i.e. key1,key3,key5,key2 is a tombstone.
// If delta GII, it should be 4, (key1, key2, key3) and (key5(T) which is unfinished operation)
verifyDeltaSizeFromStats(R, 4, 1);
// verify unfinished op for key5 is revoked
waitToVerifyKey(R, "key5", generateValue(R));
VersionTag tag = getVersionTag(R, "key5");
assertTrue(expect_tag.equals(tag));
// P.invoke(() -> InitialImageOperation.resetAllGIITestHooks());
}
/**
* vm0 and vm1 are peers, each holds a DR. Let P and R have the same RVV and RVVGC: P7,R6, RVVGC
* is P0,R0. vm1 becomes offline then restarts. Use testHook to pause the GII, then do tombstone
* GC triggered by expireBatch (not by forceGC) at R only. The tombstone GC will be executed at R,
* but igored at P. The deltaGII should send nothing to R since the RVVs are the same. So after
* GII, P and R will have different tombstone number. But P's tombstones should be expired.
*/
@Test
public void testExpiredTombstoneSkippedAtProviderOnly() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
assertEquals(0, SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
// let r4,r5,r6 to succeed
doOnePut(R, 4, "key4");
doOneDestroy(R, 5, "key5");
doOnePut(R, 6, "key1");
waitForToVerifyRVV(R, memberP, 6, null, 0); // P's rvv=p6, gc=0
waitForToVerifyRVV(R, memberR, 6, null, 0); // P's rvv=r6, gc=0
// now the rvv and rvvgc at P and R should be the same
// save R's rvv in byte array, check if it will be fullGII
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
// shutdown R and restart
closeCache(R);
// let p7 to succeed
doOnePut(P, 7, "key1");
waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
// add test hook
P.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myDuringPackingImage =
new Mycallback(DuringPackingImage, REGION_NAME);
setGIITestHook(myDuringPackingImage);
}
});
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
// restart R and gii, it will be blocked at test hook
AsyncInvocation async3 = createDistributedRegionAsync(R);
// 8
waitForCallbackStarted(P, DuringPackingImage);
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
int count = getDeltaGIICount(P);
return (count == 1);
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
int count = getDeltaGIICount(P);
assertEquals(1, count);
// let tombstone expired at R to trigger tombstoneGC.
// Wait for tombstone is GCed at R, but still exists in P
changeTombstoneTimout(R, MAX_WAIT);
changeTombstoneTimout(P, MAX_WAIT);
pause((int) MAX_WAIT);
forceGC(R, 3);
forceGC(P, 3);
// let GII continue
P.invoke(
() -> resetGIITestHook(DuringPackingImage, true));
async3.join(MAX_WAIT * 2);
count = getDeltaGIICount(P);
assertEquals(0, count);
verifyDeltaSizeFromStats(R, 1, 1); // deltaGII, key1 in delta
// tombstone key2, key5 should be GCed at R
verifyTombstoneExist(R, "key2", false, false);
verifyTombstoneExist(R, "key5", false, false);
// tombstone key2, key5 should still exist and expired at P
verifyTombstoneExist(P, "key2", true, true);
verifyTombstoneExist(P, "key5", true, true);
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
out.println("GGG:p_rvv=" + p_rvv.fullToString() + ":r_rvv=" + r_rvv.fullToString());
waitForToVerifyRVV(R, memberP, 7, null, 4); // R's rvv=p7, gc=4
waitForToVerifyRVV(R, memberR, 6, null, 5); // R's rvv=r6, gc=5
waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
}
/**
* vm0 and vm1 are peers, each holds a DR. Each does a few operations to make RVV=P7,R6,
* RVVGC=P4,R0 for both members. vm1 becomes offline then restarts. The deltaGII should only
* exchange RVV. No need to send data from vm0 to vm1.
*/
@Test
public void testRequesterHasHigherRVVGC() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
// let r4,r5,r6 to succeed
doOnePut(R, 4, "key4");
doOneDestroy(R, 5, "key5");
doOnePut(R, 6, "key1");
// let P pretends to be doing GII, to skip gcTombstone
forceAddGIICount(P);
changeTombstoneTimout(R, MAX_WAIT);
changeTombstoneTimout(P, MAX_WAIT);
Wait.pause((int) MAX_WAIT);
forceGC(R, 3);
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(R, memberP, 6, null, 4); // P's rvv=p6, gc=4
waitForToVerifyRVV(R, memberR, 6, null, 5); // R's rvv=r6, gc=5
verifyTombstoneExist(R, "key5", false, false);
verifyTombstoneExist(P, "key5", true, true);
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// let p7 to succeed
doOnePut(P, 7, "key1");
// shutdown R and restart
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
createDistributedRegion(R);
waitForToVerifyRVV(R, memberP, 7, null, 4); // P's rvv=p7, gc=4
waitForToVerifyRVV(R, memberR, 6, null, 5); // P's rvv=r6, gc=5
waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=r7, gc still 0
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc still 0
// If fullGII, the key size in gii chunk is 5, i.e. key1,key2(T),key3,key4,key5(T).
// If deltaGII, the key size is 1, i.e. P7 (key1)
verifyDeltaSizeFromStats(R, 1, 1);
verifyTombstoneExist(R, "key5", false, false);
verifyTombstoneExist(P, "key5", true, true);
}
/**
* P and R are peers, each holds a DR. Each does a few operations to make RVV=P7,R6, RVVGC=P0,R0
* for both members. P8 is clear() operation. After that, R offline. Run P9 is a put. Restart R. R
* will do deltaGII to get P9 as delta
*/
@Test
public void testDeltaGIIAfterClear() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
// let r4,r5,r6 to succeed
doOnePut(R, 4, "key4");
doOneDestroy(R, 5, "key5");
doOnePut(R, 6, "key1");
doOnePut(P, 7, "key1");
waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p7, gc=0
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(R, memberP, 7, null, 0); // R's rvv=P7, gc=0
waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0
// Note: since R is still online, clear will do flush message which will be blocked by the
// test CDL (to create unfinished operation). So in this test, no exception
doOneClear(P, 8);
// clear() increased P's version with 1 to P8
// after clear, P and R's RVVGC == RVV
waitForToVerifyRVV(P, memberP, 8, null, 8); // P's rvv=p8, gc=8
waitForToVerifyRVV(P, memberR, 6, null, 6); // P's rvv=r6, gc=6
waitForToVerifyRVV(R, memberP, 8, null, 8); // R's rvv=p8, gc=8
waitForToVerifyRVV(R, memberR, 6, null, 6); // R's rvv=r6, gc=6
// shutdown R
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// do a put at P to get some delta
doOnePut(P, 9, "key3");
waitForToVerifyRVV(P, memberP, 9, null, 8);
waitForToVerifyRVV(P, memberR, 6, null, 6);
// restart R to deltaGII
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
// shutdown P and restart
closeCache(P);
createDistributedRegion(P);
waitForToVerifyRVV(P, memberP, 9, null, 8);
waitForToVerifyRVV(P, memberR, 6, null, 6);
createDistributedRegion(R);
waitForToVerifyRVV(R, memberP, 9, null, 8);
waitForToVerifyRVV(R, memberR, 6, null, 6);
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same
verifyDeltaSizeFromStats(R, 1, 1);
}
/**
* P and R are peers, each holds a DR. Each does a few operations to make RVV=P6,R6, RVVGC=P0,R0
* for both members. R off line, then run P7. Restart R. It will trigger deltaGII to chunk entry
* P7(key1). After that, do clear(). Make sure R should not contain key1 after GII.
*/
@Test
public void testClearAfterChunkEntries() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
// let r4,r5,r6 to succeed
doOnePut(R, 4, "key4");
doOneDestroy(R, 5, "key5");
doOnePut(R, 6, "key1");
waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(R, memberP, 6, null, 0); // R's rvv=P6, gc=0
waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0
// set tesk hook
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterReceivedImageReply =
new Mycallback(GIITestHookType.AfterReceivedImageReply, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterReceivedImageReply);
}
});
// shutdown R
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
// key1 will be the delta
doOnePut(P, 7, "key1");
// retart R
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, false);
AsyncInvocation async3 = createDistributedRegionAsync(R);
// when chunk arrived, do clear()
waitForCallbackStarted(R, GIITestHookType.AfterReceivedImageReply);
doOneClear(P, 8);
R.invoke(() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterReceivedImageReply,
true));
async3.getResult(MAX_WAIT);
// clear() increased P's version with 1 to P8
// after clear, P and R's RVVGC == RVV
waitForToVerifyRVV(P, memberP, 8, null, 8); // P's rvv=r8, gc=8
waitForToVerifyRVV(P, memberR, 6, null, 6); // P's rvv=r6, gc=6
// retart R again to do fullGII
closeCache(R);
createDistributedRegion(R);
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same
waitForToVerifyRVV(R, memberP, 8, null, 8); // R's rvv=r8, gc=8
waitForToVerifyRVV(R, memberR, 6, null, 6); // R's rvv=r6, gc=6
waitToVerifyKey(P, "key1", null);
waitToVerifyKey(R, "key1", null);
verifyDeltaSizeFromStats(R, 0, 1);
}
/**
* P and R are peers, each holds a DR. Each does a few operations to make RVV=P6,R6, RVVGC=P0,R0
* for both members. R off line, then run P7. Restart R. When P's RVV arrives, do clear(). It
* should trigger fullGII. Make sure R should not contain key1 after GII.
*/
@Test
public void testClearAfterSavedRVV() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
// let r4,r5,r6 to succeed
doOnePut(R, 4, "key4");
doOneDestroy(R, 5, "key5");
doOnePut(R, 6, "key1");
waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(R, memberP, 6, null, 0); // R's rvv=P6, gc=0
waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0
// set tesk hook
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterSavedReceivedRVV =
new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV);
}
});
// shutdown R
closeCache(R);
// key1 will be the delta
doOnePut(P, 7, "key1");
// retart R
AsyncInvocation async3 = createDistributedRegionAsync(R);
// when chunk arrived, do clear()
waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV);
doOneClear(P, 8);
R.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true));
async3.join(MAX_WAIT);
// clear() increased P's version with 1 to P8
// after clear, P and R's RVVGC == RVV
waitForToVerifyRVV(P, memberP, 8, null, 8); // P's rvv=r8, gc=8
waitForToVerifyRVV(P, memberR, 6, null, 6); // P's rvv=r6, gc=6
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same
waitForToVerifyRVV(R, memberP, 8, null, 8); // R's rvv=r8, gc=8
waitForToVerifyRVV(R, memberR, 6, null, 6); // R's rvv=r6, gc=6
waitToVerifyKey(P, "key1", null);
waitToVerifyKey(R, "key1", null);
verifyDeltaSizeFromStats(R, 0, 0);
}
/**
* P and R are peers, each holds a DR. Each does a few operations to make RVV=P7,R6, RVVGC=P0,R0
* for both members. R offline, then P8 is clear() operation. Run P9 is a put. Restart R. R will
* do fullGII since R missed a clear
*/
@Test
public void testFullGIIAfterClear() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
createUnfinishedOperationsR4R5();
doOnePut(P, 7, "key1");
waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=p6, gc=0
waitForToVerifyRVV(P, memberR, 6, exceptionlist, 0); // P's rvv=r6(3-6), gc=0
waitForToVerifyRVV(R, memberP, 7, null, 0); // R's rvv=P7, gc=0
waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0
// shutdown R before clear
byte[] R_rvv_bytes = getRVVByteArray(R, REGION_NAME);
closeCache(R);
doOneClear(P, 8);
// clear() increased P's version with 1 to P8
// after clear, P and R's RVVGC == RVV, no more exception
waitForToVerifyRVV(P, memberP, 8, null, 8); // P's rvv=r6, gc=8
waitForToVerifyRVV(P, memberR, 6, null, 6); // P's rvv=r6, gc=6
// do a put at P to get some delta
doOnePut(P, 9, "key3");
waitForToVerifyRVV(P, memberP, 9, null, 8);
waitForToVerifyRVV(P, memberR, 6, null, 6);
// restart R to deltaGII
checkIfFullGII(P, REGION_NAME, R_rvv_bytes, true);
createDistributedRegion(R);
waitForToVerifyRVV(R, memberP, 9, null, 8);
waitForToVerifyRVV(R, memberR, 6, null, 6);
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same
verifyDeltaSizeFromStats(R, 1, 0);
}
/**
* vm0 and vm1 are peers, each holds a DR. create some exception list. Before GII, P's RVV is
* P7,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0 By changing MAX_UNFINISHED_OPERATIONS to be
* 1, 2. It should be fullGII then deltaGII.
*/
@Test
public void testFullGIITriggeredByTooManyUnfinishedOps() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
// force tombstone GC to let RVVGC to become P4:R0
forceGC(P, 2);
waitForToVerifyRVV(P, memberP, 6, null, 4); // P's rvv=p6, gc=4
waitForToVerifyRVV(P, memberR, 3, null, 0); // P's rvv=r3, gc=0
createUnfinishedOperationsR4R5();
// now P's cache still only has key1, key3, key5
closeCache(R);
// p7 only apply at P
doOnePut(P, 7, "key1");
// restart and gii, it should be fullGII because number of unfinished operations exceeds limit
changeUnfinishedOperationLimit(R, 1);
createDistributedRegion(R);
// If fullGII, the key size in gii chunk is 3, i.e. key1,key3,key5. key2 is GCed.
// If delta GII, the key size should be 1, i.e. P7(key1)
verifyDeltaSizeFromStats(R, 3, 0);
}
/**
* P and R are peers, each holds a DR. Each does a few operations to make RVV=P6,R6, RVVGC=P0,R0
* for both members. R off line, then run P7. Restart R. When P's RVV arrives, restart R. It
* should trigger fullGII.
*/
@Test
public void testRestartWithOnlyGIIBegion() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
final long[] exceptionlist = {4, 5};
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
// let r4,r5,r6 to succeed
doOnePut(R, 4, "key4");
doOneDestroy(R, 5, "key5");
doOnePut(R, 6, "key1");
waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(R, memberP, 6, null, 0); // R's rvv=P6, gc=0
waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0
// set tesk hook
R.invoke(new SerializableRunnable() {
@Override
public void run() {
Mycallback myAfterSavedReceivedRVV =
new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV);
}
});
// shutdown R
closeCache(R);
// key1 will be the delta
doOnePut(P, 7, "key1");
// retart R
AsyncInvocation async3 = createDistributedRegionAsync(R);
// when chunk arrived, do clear()
waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV);
// kill and restart R
closeCache(R);
async3.join(MAX_WAIT);
R.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true));
createDistributedRegion(R);
waitForToVerifyRVV(P, memberP, 7, null, 0); // P's rvv=r8, gc=0
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
RegionVersionVector p_rvv = getRVV(P);
RegionVersionVector r_rvv = getRVV(R);
assertSameRVV(p_rvv, r_rvv); // after gii, rvv should be the same
waitForToVerifyRVV(R, memberP, 7, null, 0); // R's rvv=r7, gc=0
waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0
// In fullGII, the key size in gii chunk is 2. They are: key1, key2, key3, key4, key5
verifyDeltaSizeFromStats(R, 5, 0);
}
/**
* Test the case where a member has an untrusted RVV and still initializes from the local data.
* See bug 48066
*
*/
@Test
public void testRecoverFromUntrustedRVV() throws Throwable {
prepareForEachTest();
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
// let r4,r5,r6 to succeed
doOnePut(R, 4, "key4");
doOneDestroy(R, 5, "key5");
doOnePut(R, 6, "key1");
waitForToVerifyRVV(P, memberP, 6, null, 0); // P's rvv=p6, gc=0
waitForToVerifyRVV(P, memberR, 6, null, 0); // P's rvv=r6, gc=0
waitForToVerifyRVV(R, memberP, 6, null, 0); // R's rvv=P6, gc=0
waitForToVerifyRVV(R, memberR, 6, null, 0); // R's rvv=r6, gc=0
// set tesk hook
R.invoke(new SerializableRunnable() {
@Override
public void run() {
// Add hooks before and after receiving the RVV
Mycallback myBeforeSavedReceivedRVV =
new Mycallback(GIITestHookType.BeforeSavedReceivedRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myBeforeSavedReceivedRVV);
Mycallback myAfterSavedReceivedRVV =
new Mycallback(GIITestHookType.AfterSavedReceivedRVV, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV);
}
});
// shutdown R
closeCache(R);
// retart R
AsyncInvocation async3 = createDistributedRegionAsync(R);
// when chunk arrived, do clear()
waitForCallbackStarted(R, GIITestHookType.BeforeSavedReceivedRVV);
// Before R saves the RVV, do a put. This will be recording in Rs region
// and RVV, but it will be wiped out in the RVV when R applies the RVV
// from P.
doOnePut(P, 7, "key1");
R.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.BeforeSavedReceivedRVV, true));
// Wait until the new RVV is applied
waitForCallbackStarted(R, GIITestHookType.AfterSavedReceivedRVV);
// destroy the region on P (which will force R to recover with it's own
// data
destroyRegion(P);
// Allow the GII to continue.
R.invoke(
() -> InitialImageOperation.resetGIITestHook(GIITestHookType.AfterSavedReceivedRVV, true));
async3.join(MAX_WAIT);
// createDistributedRegion(R);
// Make sure that Rs RVV now reflects the update from P
waitForToVerifyRVV(R, memberP, 7, null, 0); // P's rvv=r8, gc=0
waitForToVerifyRVV(R, memberR, 6, null, 0); // P's rvv=r6, gc=0
}
/**
* Test case to make sure that if a tombstone GC occurs during a full GII, we still have the
* correct RVV on the GII recipient at the end.
*
*/
@Test
public void testTombstoneGCDuringFullGII() throws Throwable {
prepareForEachTest();
// Create the region in 1 more VM to to a tombstone GC.
VM vm2 = Host.getHost(0).getVM(2);
createDistributedRegion(vm2);
final DiskStoreID memberP = getMemberID(P);
final DiskStoreID memberR = getMemberID(R);
assertEquals(0, DistributedCacheOperation.SLOW_DISTRIBUTION_MS);
prepareCommonTestData(6);
// All members should have "key5" at this point
// shutdown R
closeCache(R);
final VM vmR = R;
// Destroy key5, this will leave a tombstone
doOneDestroy(P, 7, "key5");
// Set tesk hook so that R will pause GII after getting the RVV
R.invoke(new SerializableRunnable() {
@Override
public void run() {
// Add hooks before and after receiving the RVV
Mycallback myAfterSavedReceivedRVV =
new Mycallback(GIITestHookType.AfterCalculatedUnfinishedOps, REGION_NAME);
InitialImageOperation.setGIITestHook(myAfterSavedReceivedRVV);
}
});
// Set a trigger in vm2 so that it will start up R after determining
// the recipients for a tombstone GC message. vm2 will wait until
// R has already received the RVV before sending the message.
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeSendMessage(ClusterDistributionManager dm,
DistributionMessage message) {
if (message instanceof TombstoneMessage
&& ((TombstoneMessage) message).regionPath.contains(REGION_NAME)) {
System.err.println("DAN DEBUG about to send tombstone message, starting up R - "
+ message.getSender());
AsyncInvocation async3 = createDistributedRegionAsync(vmR);
// Wait for R to finish requesting the RVV before letting the tombstone GC proceeed.
waitForCallbackStarted(vmR, GIITestHookType.AfterCalculatedUnfinishedOps);
System.err.println("DAN DEBUG R has received the RVV, sending tombstone message");
DistributionMessageObserver.setInstance(null);
}
}
});
}
});
P.invoke(new SerializableRunnable() {
@Override
public void run() {
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void afterProcessMessage(ClusterDistributionManager dm,
DistributionMessage message) {
if (message instanceof TombstoneMessage
&& ((TombstoneMessage) message).regionPath.contains(REGION_NAME)) {
System.err.println(
"DAN DEBUG P has processed the tombstone message, allowing R to proceed with the GII");
vmR.invoke(() -> InitialImageOperation
.resetGIITestHook(GIITestHookType.AfterCalculatedUnfinishedOps, true));
DistributionMessageObserver.setInstance(null);
}
}
});
}
});
// Force tombstone GC, this will trigger the R to be started, etc.
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
try {
cache.getTombstoneService().forceBatchExpirationForTests(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// Wait for P to perform the tombstone GC
waitForToVerifyRVV(P, memberP, 7, null, 7);
System.err
.println("DAN DEBUG P has finished the tombstone GC, waiting for R to get the correct RVV");
// Make sure that Rs RVV now reflects the update from P
waitForToVerifyRVV(R, memberP, 7, null, 7); // P's rvv=r7, gc=7
}
/**
* Returns region attributes for a <code>GLOBAL</code> region
*/
protected RegionAttributes getRegionAttributes() {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
factory.setConcurrencyChecksEnabled(true);
return factory.create();
}
protected void createDistributedRegion(VM vm) {
AsyncInvocation future = createDistributedRegionAsync(vm);
try {
future.join(MAX_WAIT);
} catch (InterruptedException e) {
Assert.fail("Create region is interrupted", e);
}
if (future.isAlive()) {
fail("Region not created within" + MAX_WAIT);
}
if (future.exceptionOccurred()) {
throw new RuntimeException(future.getException());
}
}
protected AsyncInvocation createDistributedRegionAsync(VM vm) {
SerializableRunnable createRegion = new SerializableRunnable("Create Region") {
@Override
public void run() {
try {
String value =
System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close");
assertNull(value);
RegionFactory f = getCache().createRegionFactory(getRegionAttributes());
// CCRegion = (LocalRegion)f.create(REGION_NAME);
LocalRegion lr = (LocalRegion) f.create(REGION_NAME);
LogWriterUtils.getLogWriter()
.info("In createDistributedRegion, using hydra.getLogWriter()");
LogWriterUtils.getLogWriter()
.fine("Unfinished Op limit=" + InitialImageOperation.MAXIMUM_UNFINISHED_OPERATIONS);
} catch (CacheException ex) {
Assert.fail("While creating region", ex);
}
}
};
return vm.invokeAsync(createRegion);
}
protected void closeCache(VM vm) {
SerializableRunnable close = new SerializableRunnable() {
@Override
public void run() {
try {
Cache cache = getCache();
System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close", "true");
cache.close();
} finally {
System.getProperties().remove(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close");
}
}
};
vm.invoke(close);
}
protected void destroyRegion(VM vm) {
SerializableRunnable destroy = new SerializableRunnable() {
@Override
public void run() {
LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME);
lr.localDestroyRegion();
}
};
vm.invoke(destroy);
}
protected void changeUnfinishedOperationLimit(VM vm, final int value) {
SerializableRunnable change = new SerializableRunnable() {
@Override
public void run() {
InitialImageOperation.MAXIMUM_UNFINISHED_OPERATIONS = value;
}
};
vm.invoke(change);
}
protected void changeTombstoneTimout(VM vm, final long value) {
SerializableRunnable change = new SerializableRunnable() {
@Override
public void run() {
TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = value;
}
};
vm.invoke(change);
}
protected void changeForceFullGII(VM vm, final boolean value, final boolean checkOnly) {
SerializableRunnable change = new SerializableRunnable() {
@Override
public void run() {
if (checkOnly) {
assertEquals(value, InitialImageOperation.FORCE_FULL_GII);
} else {
InitialImageOperation.FORCE_FULL_GII = value;
}
}
};
vm.invoke(change);
}
protected void removeSystemPropertiesInVM(VM vm, final String prop) {
SerializableRunnable change = new SerializableRunnable() {
@Override
public void run() {
LogWriterUtils.getLogWriter()
.info("Current prop setting: " + prop + "=" + System.getProperty(prop));
System.getProperties().remove(prop);
LogWriterUtils.getLogWriter().info(prop + "=" + System.getProperty(prop));
}
};
vm.invoke(change);
}
protected void verifyDeltaSizeFromStats(VM vm, final int expectedKeyNum,
final int expectedDeltaGIINum) {
SerializableRunnable verify = new SerializableRunnable() {
@Override
public void run() {
Cache cache = getCache();
// verify from CachePerfStats that certain amount of keys in delta
LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME);
CachePerfStats stats = lr.getRegionPerfStats();
// we saved GII completed count in RegionPerfStats only
int size = stats.getGetInitialImageKeysReceived();
cache.getLogger().info("Delta contains: " + size + " keys");
assertEquals(expectedKeyNum, size);
int num = stats.getDeltaGetInitialImagesCompleted();
cache.getLogger().info("Delta GII completed: " + num + " times");
assertEquals(expectedDeltaGIINum, num);
}
};
vm.invoke(verify);
}
// P should have smaller diskstore ID than R's
protected void assignVMsToPandR(final VM vm0, final VM vm1) {
DiskStoreID dsid0 = getMemberID(vm0);
DiskStoreID dsid1 = getMemberID(vm1);
int compare = dsid0.compareTo(dsid1);
LogWriterUtils.getLogWriter().info("Before assignVMsToPandR, dsid0 is " + dsid0 + ",dsid1 is "
+ dsid1 + ",compare=" + compare);
if (compare > 0) {
P = vm0;
R = vm1;
} else {
P = vm1;
R = vm0;
}
LogWriterUtils.getLogWriter().info("After assignVMsToPandR, P is " + P.getId() + "; R is "
+ R.getId() + " for region " + REGION_NAME);
}
private DiskStoreID getMemberID(VM vm) {
SerializableCallable getDiskStoreID = new SerializableCallable("get DiskStoreID as member id") {
@Override
public Object call() {
LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME);
assertTrue(lr != null && lr.getDiskStore() != null);
DiskStoreID dsid = lr.getDiskStore().getDiskStoreID();
return dsid;
}
};
return (DiskStoreID) vm.invoke(getDiskStoreID);
}
private void forceGC(VM vm, final int count) {
vm.invoke(new SerializableCallable("force GC") {
@Override
public Object call() throws Exception {
((GemFireCacheImpl) getCache()).getTombstoneService().forceBatchExpirationForTests(count);
return null;
}
});
}
private void forceAddGIICount(VM vm) {
vm.invoke(new SerializableCallable("force to add gii count") {
@Override
public Object call() throws Exception {
((GemFireCacheImpl) getCache()).getTombstoneService().incrementGCBlockCount();
return null;
}
});
}
private void assertDeltaGIICountBeZero(VM vm) {
vm.invoke(new SerializableCallable("assert progressingDeltaGIICount == 0") {
@Override
public Object call() throws Exception {
int count = ((GemFireCacheImpl) getCache()).getTombstoneService().getGCBlockCount();
assertEquals(0, count);
return null;
}
});
}
public void waitForToVerifyRVV(final VM vm, final DiskStoreID member,
final long expectedRegionVersion, final long[] exceptionList, final long expectedGCVersion) {
SerializableRunnable waitForVerifyRVV = new SerializableRunnable() {
private boolean verifyExceptionList(final DiskStoreID member, final long regionversion,
final RegionVersionVector rvv, final long[] exceptionList) {
boolean exceptionListVerified = true;
if (exceptionList != null) {
for (long i : exceptionList) {
exceptionListVerified = !rvv.contains(member, i);
if (!exceptionListVerified) {
LogWriterUtils.getLogWriter().finer("DeltaGII:missing exception " + i + ":" + rvv);
break;
}
}
} else {
// expect no exceptionlist
for (long i = 1; i <= regionversion; i++) {
if (!rvv.contains(member, i)) {
exceptionListVerified = false;
LogWriterUtils.getLogWriter().finer("DeltaGII:unexpected exception " + i);
break;
}
}
}
return exceptionListVerified;
}
@Override
public void run() {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
RegionVersionVector rvv = ((LocalRegion) getCache().getRegion(REGION_NAME))
.getVersionVector().getCloneForTransmission();
long regionversion = getRegionVersionForMember(rvv, member, false);
long gcversion = getRegionVersionForMember(rvv, member, true);
boolean exceptionListVerified =
verifyExceptionList(member, regionversion, rvv, exceptionList);
getLogWriter()
.info("DeltaGII:expected:" + expectedRegionVersion + ":" + expectedGCVersion);
getLogWriter().info("DeltaGII:actual:" + regionversion + ":" + gcversion
+ ":" + exceptionListVerified + ":" + rvv);
boolean match = true;
if (expectedRegionVersion != -1) {
match = match && (regionversion == expectedRegionVersion);
}
if (expectedGCVersion != -1) {
match = match && (gcversion == expectedGCVersion);
}
return match && exceptionListVerified;
}
@Override
public String description() {
RegionVersionVector rvv = ((LocalRegion) getCache().getRegion(REGION_NAME))
.getVersionVector().getCloneForTransmission();
long regionversion = getRegionVersionForMember(rvv, member, false);
long gcversion = getRegionVersionForMember(rvv, member, true);
return "expected (rv" + expectedRegionVersion + ", gc" + expectedGCVersion + ")"
+ " got (rv" + regionversion + ", gc" + gcversion + ")";
}
};
GeodeAwaitility.await().untilAsserted(ev);
RegionVersionVector rvv = ((LocalRegion) getCache().getRegion(REGION_NAME))
.getVersionVector().getCloneForTransmission();
long regionversion = getRegionVersionForMember(rvv, member, false);
long gcversion = getRegionVersionForMember(rvv, member, true);
if (expectedRegionVersion != -1) {
assertEquals(expectedRegionVersion, regionversion);
}
if (expectedGCVersion != -1) {
assertEquals(expectedGCVersion, gcversion);
}
boolean exceptionListVerified =
verifyExceptionList(member, regionversion, rvv, exceptionList);
assertTrue(exceptionListVerified);
}
};
vm.invoke(waitForVerifyRVV);
}
public void waitForCallbackStarted(final VM vm, final GIITestHookType callbacktype) {
SerializableRunnable waitForCallbackStarted = new SerializableRunnable() {
@Override
public void run() {
final GIITestHook callback =
getGIITestHookForCheckingPurpose(callbacktype);
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return (callback != null && callback.isRunning);
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
if (callback == null || !callback.isRunning) {
fail("GII tesk hook is not started yet");
}
}
};
vm.invoke(waitForCallbackStarted);
}
private VersionTag getVersionTag(VM vm, final String key) {
SerializableCallable getVersionTag = new SerializableCallable("get version tag") {
@Override
public Object call() {
VersionTag tag = ((LocalRegion) getCache().getRegion(REGION_NAME)).getVersionTag(key);
return tag;
}
};
return (VersionTag) vm.invoke(getVersionTag);
}
public void waitToVerifyKey(final VM vm, final String key, final String expect_value) {
SerializableRunnable waitToVerifyKey = new SerializableRunnable() {
@Override
public void run() {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
String value = (String) ((LocalRegion) getCache().getRegion(REGION_NAME)).get(key);
if (expect_value == null && value == null) {
return true;
} else {
return (value != null && value.equals(expect_value));
}
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
String value = (String) ((LocalRegion) getCache().getRegion(REGION_NAME)).get(key);
assertEquals(expect_value, value);
}
};
vm.invoke(waitToVerifyKey);
}
protected byte[] getRVVByteArray(VM vm, final String regionName)
throws IOException, ClassNotFoundException {
SerializableCallable getRVVByteArray = new SerializableCallable("getRVVByteArray") {
@Override
public Object call() throws Exception {
Cache cache = getCache();
LocalRegion region = (LocalRegion) cache.getRegion(regionName);
RegionVersionVector rvv = region.getVersionVector();
rvv = rvv.getCloneForTransmission();
HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
// Using gemfire serialization because
// RegionVersionVector is not java serializable
DataSerializer.writeObject(rvv, hdos);
return hdos.toByteArray();
}
};
byte[] result = (byte[]) vm.invoke(getRVVByteArray);
return result;
}
protected RegionVersionVector getRVV(VM vm) throws IOException, ClassNotFoundException {
byte[] result = getRVVByteArray(vm, REGION_NAME);
ByteArrayInputStream bais = new ByteArrayInputStream(result);
return DataSerializer.readObject(new DataInputStream(bais));
}
protected RegionVersionVector getDiskRVV(VM vm) throws IOException, ClassNotFoundException {
SerializableCallable createData = new SerializableCallable("getRVV") {
@Override
public Object call() throws Exception {
Cache cache = getCache();
LocalRegion region = (LocalRegion) cache.getRegion(REGION_NAME);
RegionVersionVector rvv = region.getDiskRegion().getRegionVersionVector();
rvv = rvv.getCloneForTransmission();
HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
// Using gemfire serialization because
// RegionVersionVector is not java serializable
DataSerializer.writeObject(rvv, hdos);
return hdos.toByteArray();
}
};
byte[] result = (byte[]) vm.invoke(createData);
ByteArrayInputStream bais = new ByteArrayInputStream(result);
return DataSerializer.readObject(new DataInputStream(bais));
}
private void checkIfFullGII(VM vm, final String regionName, final byte[] remote_rvv_bytearray,
final boolean expectFullGII) {
SerializableRunnable checkIfFullGII = new SerializableRunnable("check if full gii") {
@Override
public void run() {
DistributedRegion rr = (DistributedRegion) getCache().getRegion(regionName);
ByteArrayInputStream bais = new ByteArrayInputStream(remote_rvv_bytearray);
RegionVersionVector remote_rvv = null;
try {
remote_rvv = DataSerializer.readObject(new DataInputStream(bais));
} catch (IOException e) {
Assert.fail("Unexpected exception", e);
} catch (ClassNotFoundException e) {
Assert.fail("Unexpected exception", e);
}
RequestImageMessage rim = new RequestImageMessage();
rim.setSender(R_ID);
boolean isFullGII = rim.goWithFullGII(rr, remote_rvv);
assertEquals(expectFullGII, isFullGII);
}
};
vm.invoke(checkIfFullGII);
}
private void doOneClear(final VM vm, final long regionVersionForThisOp) {
SerializableRunnable clearOp = oneClearOp(regionVersionForThisOp, getMemberID(vm));
vm.invoke(clearOp);
}
private SerializableRunnable oneClearOp(final long regionVersionForThisOp,
final DiskStoreID memberID) {
SerializableRunnable clearOp = new SerializableRunnable("clear now") {
@Override
public void run() {
DistributedRegion rr = (DistributedRegion) getCache().getRegion(REGION_NAME);
rr.clear();
long region_version = getRegionVersionForMember(rr.getVersionVector(), memberID, false);
long region_gc_version = getRegionVersionForMember(rr.getVersionVector(), memberID, true);
assertEquals(regionVersionForThisOp, region_version);
assertEquals(region_version, region_gc_version);
}
};
return clearOp;
}
private SerializableRunnable onePutOp(final String key, final String value,
final long regionVersionForThisOp, final DiskStoreID memberID, final boolean syncInvocation) {
SerializableRunnable putOp = new SerializableRunnable("put " + key) {
@Override
public void run() {
LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME);
lr.put(key, value);
long region_version = getRegionVersionForMember(lr.getVersionVector(), memberID, false);
if (syncInvocation) {
assertEquals(regionVersionForThisOp, region_version);
}
}
};
return putOp;
}
private void doOnePut(final VM vm, final long regionVersionForThisOp, final String key) {
SerializableRunnable putOp =
onePutOp(key, generateValue(vm), regionVersionForThisOp, getMemberID(vm), true);
vm.invoke(putOp);
}
private AsyncInvocation doOnePutAsync(final VM vm, final long regionVersionForThisOp,
final String key) {
SerializableRunnable putOp =
onePutOp(key, generateValue(vm), regionVersionForThisOp, getMemberID(vm), false);
AsyncInvocation async = vm.invokeAsync(putOp);
return async;
}
private String generateValue(final VM vm) {
return "VALUE from vm" + vm.getId();
}
private SerializableRunnable oneDestroyOp(final String key, final String value,
final long regionVersionForThisOp, final DiskStoreID memberID, final boolean syncInvocation) {
SerializableRunnable destroyOp = new SerializableRunnable("destroy " + key) {
@Override
public void run() {
LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME);
lr.destroy(key);
long region_version = getRegionVersionForMember(lr.getVersionVector(), memberID, false);
if (syncInvocation) {
assertEquals(regionVersionForThisOp, region_version);
}
}
};
return destroyOp;
}
private void doOneDestroy(final VM vm, final long regionVersionForThisOp, final String key) {
SerializableRunnable destroyOp =
oneDestroyOp(key, generateValue(vm), regionVersionForThisOp, getMemberID(vm), true);
vm.invoke(destroyOp);
}
private AsyncInvocation doOneDestroyAsync(final VM vm, final long regionVersionForThisOp,
final String key) {
SerializableRunnable destroyOp =
oneDestroyOp(key, generateValue(vm), regionVersionForThisOp, getMemberID(vm), false);
AsyncInvocation async = vm.invokeAsync(destroyOp);
return async;
}
private long getRegionVersionForMember(RegionVersionVector rvv, DiskStoreID member,
boolean isRVVGC) {
long ret = 0;
if (isRVVGC) {
ret = rvv.getGCVersion(member);
} else {
ret = rvv.getVersionForMember(member);
}
return (ret == -1 ? 0 : ret);
}
private void assertSameRVV(RegionVersionVector rvv1, RegionVersionVector rvv2) {
if (!rvv1.sameAs(rvv2)) {
fail("Expected " + rvv1 + " but was " + rvv2);
}
}
protected void verifyTombstoneExist(VM vm, final String key, final boolean expectExist,
final boolean expectExpired) {
SerializableRunnable verify = new SerializableRunnable() {
private boolean doneVerify() {
Cache cache = getCache();
LocalRegion lr = (LocalRegion) getCache().getRegion(REGION_NAME);
NonTXEntry entry = (NonTXEntry) lr.getEntry(key, true);
if (expectExist) {
assertTrue(entry != null && entry.getRegionEntry().isTombstone());
}
System.out.println("GGG:new timeout=" + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT);
if (entry == null || !entry.getRegionEntry().isTombstone()) {
return (false == expectExist);
} else {
long ts = entry.getRegionEntry().getVersionStamp().getVersionTimeStamp();
if (expectExpired) {
return (ts + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT <= ((GemFireCacheImpl) cache)
.cacheTimeMillis()); // use MAX_WAIT as timeout
} else {
return (true == expectExist);
}
}
}
@Override
public void run() {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return doneVerify();
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
assertTrue(doneVerify());
}
};
vm.invoke(verify);
}
protected int getDeltaGIICount(VM vm) {
SerializableCallable getDelGIICount = new SerializableCallable("getDelGIICount") {
@Override
public Object call() throws Exception {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
return gfc.getTombstoneService().getGCBlockCount();
}
};
int result = (Integer) vm.invoke(getDelGIICount);
return result;
}
private void checkAsyncCall(AsyncInvocation async) {
try {
async.join(30000);
if (async.exceptionOccurred()) {
Assert.fail("Test failed", async.getException());
}
} catch (InterruptedException e1) {
Assert.fail("Test failed", e1);
}
}
public static void slowGII(final long[] versionsToBlock) {
DistributionMessageObserver.setInstance(new BlockMessageObserver(versionsToBlock));
}
public static void resetSlowGII() {
BlockMessageObserver observer =
(BlockMessageObserver) DistributionMessageObserver.setInstance(null);
if (observer != null) {
observer.cdl.countDown();
}
}
// private void slowGII(VM vm) {
// SerializableRunnable slowGII = new SerializableRunnable("Slow down GII") {
// @SuppressWarnings("synthetic-access")
// public void run() {
// slowGII();
// }
// };
// vm.invoke(slowGII);
// }
//
// private void resetSlowGII(VM vm) {
// SerializableRunnable resetSlowGII = new SerializableRunnable("Unset the slow GII") {
// public void run() {
// resetSlowGII();
// }
// };
// vm.invoke(resetSlowGII);
// }
private static class BlockMessageObserver extends DistributionMessageObserver {
private long[] versionsToBlock;
CountDownLatch cdl = new CountDownLatch(1);
BlockMessageObserver(long[] versionsToBlock) {
this.versionsToBlock = versionsToBlock;
}
@Override
public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) {
VersionTag tag = null;
if (message instanceof UpdateMessage) {
UpdateMessage um = (UpdateMessage) message;
tag = um.getVersionTag();
} else if (message instanceof DestroyMessage) {
DestroyMessage um = (DestroyMessage) message;
tag = um.getVersionTag();
} else {
return;
}
try {
boolean toBlock = false;
for (long blockversion : versionsToBlock) {
if (tag.getRegionVersion() == blockversion) {
toBlock = true;
break;
}
}
if (toBlock) {
cdl.await();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private class Mycallback extends GIITestHook {
private Object lockObject = new Object();
public Mycallback(GIITestHookType type, String region_name) {
super(type, region_name);
}
@Override
public void reset() {
synchronized (this.lockObject) {
this.lockObject.notify();
}
}
@Override
public void run() {
synchronized (this.lockObject) {
try {
isRunning = true;
this.lockObject.wait();
} catch (InterruptedException e) {
}
}
}
} // Mycallback
}