blob: dc68ad9260308ecb9f0cc33a4471a74e6bf0a019 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache30;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Set;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.StateFlushOperation;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
/**
* This class tests the functionality of a cache {@link Region region} that has a scope of
* {@link Scope#DISTRIBUTED_NO_ACK distributed no ACK}.
*
* @since GemFire 3.0
*/
public class DistributedNoAckRegionDUnitTest extends MultiVMRegionTestCase {
/**
* Returns region attributes for a <code>GLOBAL</code> region
*/
@Override
protected RegionAttributes getRegionAttributes() {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
factory.setDataPolicy(DataPolicy.PRELOADED);
factory.setConcurrencyChecksEnabled(false);
return factory.create();
}
/**
* Tests creating a distributed subregion of a local scope region, which should fail.
*/
@Test
public void testDistSubregionOfLocalRegion() throws Exception {
// creating a distributed subregion of a LOCAL region is illegal.
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
createRootRegion(factory.create());
try {
createRegion(getUniqueName());
fail("Should have thrown an IllegalStateException");
} catch (IllegalStateException e) {
// pass
}
}
/**
* Tests the compatibility of creating certain kinds of subregions of a local region.
*
* @see Region#createSubregion
*/
@Test
public void testIncompatibleSubregions() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
// Scope.GLOBAL is illegal if there is any other cache in the
// distributed system that has the same region with
// Scope.DISTRIBUTED_NO_ACK.
final String name = this.getUniqueName() + "-NO_ACK";
vm0.invoke(new SerializableRunnable("Create NO ACK Region") {
@Override
public void run() {
try {
Region region = createRegion(name, "INCOMPATIBLE_ROOT", getRegionAttributes());
assertTrue(
getRootRegion("INCOMPATIBLE_ROOT").getAttributes().getScope().isDistributedNoAck());
assertTrue(region.getAttributes().getScope().isDistributedNoAck());
} catch (CacheException ex) {
Assert.fail("While creating NO ACK region", ex);
}
}
});
vm1.invoke(new SerializableRunnable("Create GLOBAL Region") {
@Override
public void run() {
try {
AttributesFactory factory = new AttributesFactory(getRegionAttributes());
factory.setScope(Scope.GLOBAL);
assertNull(getRootRegion("INCOMPATIBLE_ROOT"));
try {
createRootRegion("INCOMPATIBLE_ROOT", factory.create());
fail("Should have thrown an IllegalStateException");
} catch (IllegalStateException ex) {
// pass...
}
} catch (CacheException ex) {
Assert.fail("While creating GLOBAL Region", ex);
}
}
});
vm1.invoke(new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
try {
AttributesFactory factory = new AttributesFactory(getRegionAttributes());
factory.setScope(Scope.DISTRIBUTED_ACK);
assertNull(getRootRegion("INCOMPATIBLE_ROOT"));
try {
createRootRegion("INCOMPATIBLE_ROOT", factory.create());
fail("Should have thrown an IllegalStateException");
} catch (IllegalStateException ex) {
// pass...
}
} catch (CacheException ex) {
Assert.fail("While creating ACK Region", ex);
}
}
});
}
private static final int CHUNK_SIZE = 500 * 1024; // == InitialImageOperation.CHUNK_SIZE_IN_BYTES
// use sizes so it completes in < 15 sec, but hangs if bug exists
private static final int NUM_ENTRIES_VM = 15000;
private static final int VALUE_SIZE_VM = CHUNK_SIZE * 150 / NUM_ENTRIES_VM;
private static final int NUM_PUTS = 100000;
protected static volatile boolean stopPutting = false;
/**
* Messages pile up in overflow queue during long GetInitialImage This test was disabled since we
* not longer have an overflow queue and GII is now non-blocking (bug 30705 was caused blocking
* gii). This test can take a long time to run on disk regions.
*/
@Ignore("TODO")
@Test
public void testBug30705() throws Exception {
final String name = this.getUniqueName();
final int numEntries = NUM_ENTRIES_VM;
final int valueSize = VALUE_SIZE_VM;
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm2 = host.getVM(2);
SerializableRunnable create = new CacheSerializableRunnable("Create Mirrored Region") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("testBug30705: Start creating Mirrored Region");
AttributesFactory factory = new AttributesFactory(getRegionAttributes());
factory.setDataPolicy(DataPolicy.REPLICATE);
createRegion(name, factory.create());
LogWriterUtils.getLogWriter().info("testBug30705: Finished creating Mirrored Region");
}
};
SerializableRunnable put = new CacheSerializableRunnable("Distributed NoAck Puts") {
@Override
public void run2() throws CacheException {
Region rgn = getCache().getRegion("/root/" + name);
assertNotNull(rgn);
Object key = new Integer(0x42);
Object value = new byte[0];
assertNotNull(value);
LogWriterUtils.getLogWriter().info("testBug30705: Started Distributed NoAck Puts");
for (int i = 0; i < NUM_PUTS; i++) {
if (stopPutting) {
LogWriterUtils.getLogWriter()
.info("testBug30705: Interrupted Distributed Ack Puts after " + i + " PUTS");
break;
}
if ((i % 1000) == 0) {
LogWriterUtils.getLogWriter().info("testBug30705: modification #" + i);
}
rgn.put(key, value);
}
}
};
vm0.invoke(create);
vm0.invoke(new CacheSerializableRunnable("Put data") {
@Override
public void run2() throws CacheException {
LogWriterUtils.getLogWriter().info("testBug30705: starting initial data load");
Region region = getRootRegion().getSubregion(name);
final byte[] value = new byte[valueSize];
Arrays.fill(value, (byte) 0x42);
for (int i = 0; i < numEntries; i++) {
if ((i % 1000) == 0) {
LogWriterUtils.getLogWriter().info("testBug30705: initial put #" + i);
}
region.put(new Integer(i), value);
}
LogWriterUtils.getLogWriter().info("testBug30705: finished initial data load");
}
});
// start putting
AsyncInvocation async = vm0.invokeAsync(put);
// do initial image
try {
LogWriterUtils.getLogWriter().info("testBug30705: before the critical create");
vm2.invoke(create);
LogWriterUtils.getLogWriter().info("testBug30705: after the critical create");
} finally {
// test passes if this does not hang
LogWriterUtils.getLogWriter()
.info("testBug30705: INTERRUPTING Distributed NoAck Puts after GetInitialImage");
vm0.invoke(new SerializableRunnable("Interrupt Puts") {
@Override
public void run() {
LogWriterUtils.getLogWriter().info("testBug30705: interrupting putter");
stopPutting = true;
}
});
ThreadUtils.join(async, 30 * 1000);
// wait for overflow queue to quiesce before continuing
vm2.invoke(new SerializableRunnable("Wait for Overflow Queue") {
@Override
public void run() {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return getSystem().getDistributionManager().getStats().getOverflowQueueSize() == 0;
}
@Override
public String description() {
return "overflow queue remains nonempty";
}
};
GeodeAwaitility.await().untilAsserted(ev);
}
});
} // finally
LogWriterUtils.getLogWriter().info("testBug30705: at end of test");
if (async.exceptionOccurred()) {
Assert.fail("Got exception", async.getException());
}
}
@Override
protected void pauseIfNecessary(int ms) {
Wait.pause(ms);
}
@Override
protected void pauseIfNecessary() {
Wait.pause();
}
@Override
protected void flushIfNecessary(Region r) {
DistributedRegion dr = (DistributedRegion) r;
Set<InternalDistributedMember> targets = dr.getDistributionAdvisor().adviseCacheOp();
StateFlushOperation.flushTo(targets, dr);
}
/**
* The number of milliseconds to try repeating validation code in the event that AssertionError is
* thrown. For DISTRIBUTED_NO_ACK scopes, a repeat timeout is used to account for the fact that a
* previous operation may have not yet completed.
*/
@Override
protected long getRepeatTimeoutMs() {
return 120 * 1000;
}
}