blob: 15339f9f0256c178355d30eb3e84bca190672a92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
@SuppressWarnings("synthetic-access")
public class PartitionedRegionDelayedRecoveryDUnitTest extends JUnit4CacheTestCase {
public PartitionedRegionDelayedRecoveryDUnitTest() {
super();
}
@Override
public final void postTearDownCacheTestCase() throws Exception {
Invoke.invokeInEveryVM(new SerializableRunnable() {
@Override
public void run() {
InternalResourceManager.setResourceObserver(null);
}
});
InternalResourceManager.setResourceObserver(null);
}
@Test
public void testNoRecovery() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPrRegions = new SerializableRunnable("createRegions") {
@Override
public void run() {
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRecoveryDelay(-1);
paf.setStartupRecoveryDelay(-1);
paf.setRedundantCopies(1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
// create the region in 2 VMS
vm0.invoke(createPrRegions);
vm1.invoke(createPrRegions);
// Do 1 put, which should create 1 bucket on both Vms
vm0.invoke(new SerializableRunnable("putData") {
@Override
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
region1.put("A", "B");
}
});
// create the PR on another region, which won't have the bucket
vm2.invoke(createPrRegions);
// destroy the region in 1 of the VM's that's hosting the bucket
vm1.invoke(new SerializableRunnable("Destroy region") {
@Override
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
region1.localDestroyRegion();
}
});
// check to make sure we didn't make a copy of the low redundancy bucket
SerializableRunnable checkNoBucket = new SerializableRunnable("Check for bucket") {
@Override
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
assertEquals(0, region1.getDataStore().getBucketsManaged());
}
};
// Wait for a bit, maybe the region will try to make a copy of the bucket
Thread.sleep(1000);
vm2.invoke(checkNoBucket);
// recreate the region on VM1
vm1.invoke(createPrRegions);
// Wait for a bit, maybe the region will try to make a copy of the bucket
Thread.sleep(1000);
vm1.invoke(checkNoBucket);
vm2.invoke(checkNoBucket);
}
@Test
public void testDelay() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPrRegions = new SerializableRunnable("createRegions") {
@Override
public void run() {
final CountDownLatch rebalancingFinished = new CountDownLatch(1);
InternalResourceManager.setResourceObserver(new ResourceObserverAdapter() {
@Override
public void rebalancingOrRecoveryFinished(Region region) {
rebalancingFinished.countDown();
}
});
try {
Cache cache = getCache();
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setRecoveryDelay(5000);
paf.setRedundantCopies(1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
if (!rebalancingFinished.await(60000, TimeUnit.MILLISECONDS)) {
fail("Redundancy recovery did not happen within 60 seconds");
}
} catch (InterruptedException e) {
Assert.fail("interrupted", e);
} finally {
InternalResourceManager.setResourceObserver(null);
}
}
};
// create the region in 2 VMS
vm0.invoke(createPrRegions);
vm1.invoke(createPrRegions);
// Do 1 put, which should create 1 bucket
vm0.invoke(new SerializableRunnable("putData") {
@Override
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
region1.put("A", "B");
}
});
// create the region in a third VM, which won't have any buckets
vm2.invoke(createPrRegions);
final long begin = System.currentTimeMillis();
// close 1 cache, which should make the bucket drop below
// the expected redundancy level.
vm1.invoke(new SerializableRunnable("close cache") {
@Override
public void run() {
Cache cache = getCache();
cache.close();
}
});
long elapsed = waitForBucketRecovery(vm2, 1, begin);
assertTrue("Did not wait at least 5 seconds to create the bucket. Elapsed=" + elapsed,
elapsed >= 5000);
}
@Test
public void testStartupDelay() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
SerializableRunnable createPrRegions = new SerializableRunnable("createRegions") {
@Override
public void run() {
Cache cache = getCache();
InternalResourceManager.setResourceObserver(new MyResourceObserver());
AttributesFactory attr = new AttributesFactory();
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setStartupRecoveryDelay(5000);
paf.setRedundantCopies(1);
PartitionAttributes prAttr = paf.create();
attr.setPartitionAttributes(prAttr);
cache.createRegion("region1", attr.create());
}
};
// create the region in 2 VMS
vm0.invoke(createPrRegions);
vm1.invoke(createPrRegions);
// Do 1 put, which should create 1 bucket
vm0.invoke(new SerializableRunnable("putData") {
@Override
public void run() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
region1.put(1, "B");
region1.put(2, "B");
region1.put(3, "B");
region1.put(4, "B");
}
});
// close 1 cache, which should make the bucket drop below
// the expected redundancy level.
vm1.invoke(new SerializableRunnable("close cache") {
@Override
public void run() {
Cache cache = getCache();
cache.close();
}
});
final long begin = System.currentTimeMillis();
// create the region in a third VM, which won't have any buckets
vm2.invoke(createPrRegions);
long elapsed = System.currentTimeMillis() - begin;
assertTrue("Create region should not have waited to recover redundancy. Elapsed=" + elapsed,
elapsed < 5000);
// wait for the bucket to be copied
elapsed = waitForBucketRecovery(vm2, 4, begin);
assertTrue("Did not wait at least 5 seconds to create the bucket. Elapsed=" + elapsed,
elapsed >= 5000);
vm2.invoke(new SerializableCallable("wait for primary move") {
@Override
public Object call() throws Exception {
Cache cache = getCache();
MyResourceObserver observer =
(MyResourceObserver) InternalResourceManager.getResourceObserver();
observer.waitForRecovery(30, TimeUnit.SECONDS);
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
assertEquals(2, region1.getDataStore().getNumberOfPrimaryBucketsManaged());
return null;
}
});
}
private long waitForBucketRecovery(VM vm2, final int numBuckets, final long begin) {
// wait for the bucket to be copied
Long elapsed = (Long) vm2.invoke(new SerializableCallable("putData") {
@Override
public Object call() {
Cache cache = getCache();
PartitionedRegion region1 = (PartitionedRegion) cache.getRegion("region1");
while (System.currentTimeMillis() - begin < 30000) {
int bucketsManaged = region1.getDataStore().getBucketsManaged();
if (bucketsManaged == numBuckets) {
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) { // TODO: don't catch InterruptedException -- let test
// fail!
e.printStackTrace();
}
}
}
assertEquals("Did not start managing the bucket within 30 seconds", numBuckets,
region1.getDataStore().getBucketsManaged());
long elapsed = System.currentTimeMillis() - begin;
return elapsed;
}
});
return elapsed;
}
private static class MyResourceObserver extends ResourceObserverAdapter {
CountDownLatch recoveryComplete = new CountDownLatch(1);
public void waitForRecovery(long time, TimeUnit unit) throws InterruptedException {
recoveryComplete.await(time, unit);
}
@Override
public void rebalancingOrRecoveryFinished(Region region) {
recoveryComplete.countDown();
}
}
}